001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.IOException;
020    import java.io.Serializable;
021    import java.io.StringReader;
022    import java.io.StringWriter;
023    import java.util.HashMap;
024    import java.util.Locale;
025    import java.util.Map;
026    
027    import javax.jms.JMSException;
028    
029    import org.apache.activemq.advisory.AdvisorySupport;
030    import org.apache.activemq.broker.BrokerContext;
031    import org.apache.activemq.broker.BrokerContextAware;
032    import org.apache.activemq.command.ActiveMQMapMessage;
033    import org.apache.activemq.command.ActiveMQMessage;
034    import org.apache.activemq.command.ActiveMQObjectMessage;
035    import org.apache.activemq.command.DataStructure;
036    import org.codehaus.jettison.mapped.Configuration;
037    import org.fusesource.hawtbuf.UTF8Buffer;
038    
039    import com.thoughtworks.xstream.XStream;
040    import com.thoughtworks.xstream.converters.basic.AbstractSingleValueConverter;
041    import com.thoughtworks.xstream.io.HierarchicalStreamReader;
042    import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
043    import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
044    import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
045    import com.thoughtworks.xstream.io.xml.XppReader;
046    import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
047    
048    /**
049     * Frame translator implementation that uses XStream to convert messages to and
050     * from XML and JSON
051     *
052     * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
053     */
054    public class JmsFrameTranslator extends LegacyFrameTranslator implements
055            BrokerContextAware {
056    
057        XStream xStream = null;
058        BrokerContext brokerContext;
059    
060        @Override
061        public ActiveMQMessage convertFrame(ProtocolConverter converter,
062                StompFrame command) throws JMSException, ProtocolException {
063            Map<String, String> headers = command.getHeaders();
064            ActiveMQMessage msg;
065            String transformation = headers.get(Stomp.Headers.TRANSFORMATION);
066            if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
067                msg = super.convertFrame(converter, command);
068            } else {
069                HierarchicalStreamReader in;
070    
071                try {
072                    String text = new String(command.getContent(), "UTF-8");
073                    switch (Stomp.Transformations.getValue(transformation)) {
074                    case JMS_OBJECT_XML:
075                        in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
076                        msg = createObjectMessage(in);
077                        break;
078                    case JMS_OBJECT_JSON:
079                        in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
080                        msg = createObjectMessage(in);
081                        break;
082                    case JMS_MAP_XML:
083                        in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
084                        msg = createMapMessage(in);
085                        break;
086                    case JMS_MAP_JSON:
087                        in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
088                        msg = createMapMessage(in);
089                        break;
090                    default:
091                        throw new Exception("Unkown transformation: " + transformation);
092                    }
093                } catch (Throwable e) {
094                    command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
095                    msg = super.convertFrame(converter, command);
096                }
097            }
098            FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
099            return msg;
100        }
101    
102        @Override
103        public StompFrame convertMessage(ProtocolConverter converter,
104                ActiveMQMessage message) throws IOException, JMSException {
105            if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
106                StompFrame command = new StompFrame();
107                command.setAction(Stomp.Responses.MESSAGE);
108                Map<String, String> headers = new HashMap<String, String>(25);
109                command.setHeaders(headers);
110    
111                FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
112                        converter, message, command, this);
113    
114                if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
115                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
116                } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
117                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
118                }
119    
120                ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
121                command.setContent(marshall(msg.getObject(),
122                        headers.get(Stomp.Headers.TRANSFORMATION))
123                        .getBytes("UTF-8"));
124                return command;
125    
126            } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
127                StompFrame command = new StompFrame();
128                command.setAction(Stomp.Responses.MESSAGE);
129                Map<String, String> headers = new HashMap<String, String>(25);
130                command.setHeaders(headers);
131    
132                FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
133                        converter, message, command, this);
134    
135                if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
136                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
137                } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
138                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
139                }
140    
141                ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
142                command.setContent(marshall((Serializable)msg.getContentMap(),
143                        headers.get(Stomp.Headers.TRANSFORMATION)).getBytes("UTF-8"));
144                return command;
145            } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
146                    AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
147    
148                StompFrame command = new StompFrame();
149                command.setAction(Stomp.Responses.MESSAGE);
150                Map<String, String> headers = new HashMap<String, String>(25);
151                command.setHeaders(headers);
152    
153                FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
154                        converter, message, command, this);
155    
156                if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
157                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
158                } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
159                    headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
160                }
161    
162                String body = marshallAdvisory(message.getDataStructure(),
163                        headers.get(Stomp.Headers.TRANSFORMATION));
164                command.setContent(body.getBytes("UTF-8"));
165                return command;
166            } else {
167                return super.convertMessage(converter, message);
168            }
169        }
170    
171        /**
172         * Marshalls the Object to a string using XML or JSON encoding
173         */
174        protected String marshall(Serializable object, String transformation) throws JMSException {
175            StringWriter buffer = new StringWriter();
176            HierarchicalStreamWriter out;
177            if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) {
178                out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
179            } else {
180                out = new PrettyPrintWriter(buffer);
181            }
182            getXStream().marshal(object, out);
183            return buffer.toString();
184        }
185    
186        protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
187            ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
188            Object obj = getXStream().unmarshal(in);
189            objMsg.setObject((Serializable) obj);
190            return objMsg;
191        }
192    
193        @SuppressWarnings("unchecked")
194        protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
195            ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
196            Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
197            for (String key : map.keySet()) {
198                mapMsg.setObject(key, map.get(key));
199            }
200            return mapMsg;
201        }
202    
203        protected String marshallAdvisory(final DataStructure ds, String transformation) {
204    
205            StringWriter buffer = new StringWriter();
206            HierarchicalStreamWriter out;
207            if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) {
208                out = new JettisonMappedXmlDriver().createWriter(buffer);
209            } else {
210                out = new PrettyPrintWriter(buffer);
211            }
212    
213            XStream xstream = getXStream();
214            xstream.setMode(XStream.NO_REFERENCES);
215            xstream.aliasPackage("", "org.apache.activemq.command");
216            xstream.marshal(ds, out);
217            return buffer.toString();
218        }
219    
220        // Properties
221        // -------------------------------------------------------------------------
222        public XStream getXStream() {
223            if (xStream == null) {
224                xStream = createXStream();
225            }
226            return xStream;
227        }
228    
229        public void setXStream(XStream xStream) {
230            this.xStream = xStream;
231        }
232    
233        // Implementation methods
234        // -------------------------------------------------------------------------
235        @SuppressWarnings("unchecked")
236        protected XStream createXStream() {
237            XStream xstream = null;
238            if (brokerContext != null) {
239                Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class);
240                for (XStream bean : beans.values()) {
241                    if (bean != null) {
242                        xstream = bean;
243                        break;
244                    }
245                }
246            }
247    
248            if (xstream == null) {
249                xstream = new XStream();
250            }
251    
252            // For any object whose elements contains an UTF8Buffer instance instead of a String
253            // type we map it to String both in and out such that we don't marshal UTF8Buffers out
254            xstream.registerConverter(new AbstractSingleValueConverter() {
255    
256                @Override
257                public Object fromString(String str) {
258                    return str;
259                }
260    
261                @SuppressWarnings("rawtypes")
262                @Override
263                public boolean canConvert(Class type) {
264                    return type.equals(UTF8Buffer.class);
265                }
266            });
267    
268            xstream.alias("string", UTF8Buffer.class);
269    
270            return xstream;
271        }
272    
273        @Override
274        public void setBrokerContext(BrokerContext brokerContext) {
275            this.brokerContext = brokerContext;
276        }
277    }