001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.IOException;
020    import java.io.Serializable;
021    import java.io.StringReader;
022    import java.io.StringWriter;
023    import java.util.HashMap;
024    import java.util.Locale;
025    import java.util.Map;
026    
027    import javax.jms.JMSException;
028    
029    import org.apache.activemq.advisory.AdvisorySupport;
030    import org.apache.activemq.broker.BrokerContext;
031    import org.apache.activemq.broker.BrokerContextAware;
032    import org.apache.activemq.command.ActiveMQMapMessage;
033    import org.apache.activemq.command.ActiveMQMessage;
034    import org.apache.activemq.command.ActiveMQObjectMessage;
035    import org.apache.activemq.command.DataStructure;
036    import org.codehaus.jettison.mapped.Configuration;
037    
038    import com.thoughtworks.xstream.XStream;
039    import com.thoughtworks.xstream.io.HierarchicalStreamReader;
040    import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
041    import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
042    import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
043    import com.thoughtworks.xstream.io.xml.XppReader;
044    import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
045    
046    /**
047     * Frame translator implementation that uses XStream to convert messages to and
048     * from XML and JSON
049     *
050     * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
051     */
052    public class JmsFrameTranslator extends LegacyFrameTranslator implements
053            BrokerContextAware {
054    
055        XStream xStream = null;
056        BrokerContext brokerContext;
057    
058        public ActiveMQMessage convertFrame(ProtocolConverter converter,
059                StompFrame command) throws JMSException, ProtocolException {
060            Map<String, String> headers = command.getHeaders();
061            ActiveMQMessage msg;
062            String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
063            if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
064                msg = super.convertFrame(converter, command);
065            } else {
066                HierarchicalStreamReader in;
067    
068                try {
069                    String text = new String(command.getContent(), "UTF-8");
070                    switch (Stomp.Transformations.getValue(transformation)) {
071                    case JMS_OBJECT_XML:
072                        in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
073                        msg = createObjectMessage(in);
074                        break;
075                    case JMS_OBJECT_JSON:
076                        in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
077                        msg = createObjectMessage(in);
078                        break;
079                    case JMS_MAP_XML:
080                        in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
081                        msg = createMapMessage(in);
082                        break;
083                    case JMS_MAP_JSON:
084                        in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
085                        msg = createMapMessage(in);
086                        break;
087                    default:
088                        throw new Exception("Unkown transformation: " + transformation);
089                    }
090                } catch (Throwable e) {
091                    command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
092                    msg = super.convertFrame(converter, command);
093                }
094            }
095            FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
096            return msg;
097        }
098    
099        public StompFrame convertMessage(ProtocolConverter converter,
100                ActiveMQMessage message) throws IOException, JMSException {
101            if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
102                StompFrame command = new StompFrame();
103                command.setAction(Stomp.Responses.MESSAGE);
104                Map<String, String> headers = new HashMap<String, String>(25);
105                command.setHeaders(headers);
106    
107                FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
108                        converter, message, command, this);
109    
110                if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
111                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
112                } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
113                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
114                }
115    
116                ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
117                command.setContent(marshall(msg.getObject(),
118                        headers.get(Stomp.Headers.TRANSFORMATION))
119                        .getBytes("UTF-8"));
120                return command;
121    
122            } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
123                StompFrame command = new StompFrame();
124                command.setAction(Stomp.Responses.MESSAGE);
125                Map<String, String> headers = new HashMap<String, String>(25);
126                command.setHeaders(headers);
127    
128                FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
129                        converter, message, command, this);
130    
131                if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
132                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
133                } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
134                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
135                }
136    
137                ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
138                command.setContent(marshall((Serializable)msg.getContentMap(),
139                        headers.get(Stomp.Headers.TRANSFORMATION))
140                        .getBytes("UTF-8"));
141                return command;
142            } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
143                    AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
144    
145                StompFrame command = new StompFrame();
146                command.setAction(Stomp.Responses.MESSAGE);
147                Map<String, String> headers = new HashMap<String, String>(25);
148                command.setHeaders(headers);
149    
150                FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
151                        converter, message, command, this);
152    
153                if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
154                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
155                } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
156                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
157                }
158    
159                String body = marshallAdvisory(message.getDataStructure(),
160                        headers.get(Stomp.Headers.TRANSFORMATION));
161                command.setContent(body.getBytes("UTF-8"));
162                return command;
163            } else {
164                return super.convertMessage(converter, message);
165            }
166        }
167    
168        /**
169         * Marshalls the Object to a string using XML or JSON encoding
170         */
171        protected String marshall(Serializable object, String transformation)
172                throws JMSException {
173            StringWriter buffer = new StringWriter();
174            HierarchicalStreamWriter out;
175            if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) {
176                out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
177            } else {
178                out = new PrettyPrintWriter(buffer);
179            }
180            getXStream().marshal(object, out);
181            return buffer.toString();
182        }
183    
184        protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
185            ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
186            Object obj = getXStream().unmarshal(in);
187            objMsg.setObject((Serializable) obj);
188            return objMsg;
189        }
190    
191        @SuppressWarnings("unchecked")
192        protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
193            ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
194            Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
195            for (String key : map.keySet()) {
196                mapMsg.setObject(key, map.get(key));
197            }
198            return mapMsg;
199        }
200    
201        protected String marshallAdvisory(final DataStructure ds, String transformation) {
202    
203            StringWriter buffer = new StringWriter();
204            HierarchicalStreamWriter out;
205            if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) {
206                out = new JettisonMappedXmlDriver().createWriter(buffer);
207            } else {
208                out = new PrettyPrintWriter(buffer);
209            }
210    
211            XStream xstream = getXStream();
212            xstream.setMode(XStream.NO_REFERENCES);
213            xstream.aliasPackage("", "org.apache.activemq.command");
214            xstream.marshal(ds, out);
215            return buffer.toString();
216        }
217    
218        // Properties
219        // -------------------------------------------------------------------------
220        public XStream getXStream() {
221            if (xStream == null) {
222                xStream = createXStream();
223            }
224            return xStream;
225        }
226    
227        public void setXStream(XStream xStream) {
228            this.xStream = xStream;
229        }
230    
231        // Implementation methods
232        // -------------------------------------------------------------------------
233        @SuppressWarnings("unchecked")
234        protected XStream createXStream() {
235            XStream xstream = null;
236            if (brokerContext != null) {
237                Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class);
238                for (XStream bean : beans.values()) {
239                    if (bean != null) {
240                        xstream = bean;
241                        break;
242                    }
243                }
244            }
245    
246            if (xstream == null) {
247                xstream = new XStream();
248            }
249            return xstream;
250    
251        }
252    
253        public void setBrokerContext(BrokerContext brokerContext) {
254            this.brokerContext = brokerContext;
255        }
256    
257    }