001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.stomp;
018
019 import java.io.IOException;
020 import java.io.Serializable;
021 import java.io.StringReader;
022 import java.io.StringWriter;
023 import java.util.HashMap;
024 import java.util.Locale;
025 import java.util.Map;
026
027 import javax.jms.JMSException;
028
029 import org.apache.activemq.advisory.AdvisorySupport;
030 import org.apache.activemq.broker.BrokerContext;
031 import org.apache.activemq.broker.BrokerContextAware;
032 import org.apache.activemq.command.ActiveMQMapMessage;
033 import org.apache.activemq.command.ActiveMQMessage;
034 import org.apache.activemq.command.ActiveMQObjectMessage;
035 import org.apache.activemq.command.DataStructure;
036 import org.codehaus.jettison.mapped.Configuration;
037
038 import com.thoughtworks.xstream.XStream;
039 import com.thoughtworks.xstream.io.HierarchicalStreamReader;
040 import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
041 import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
042 import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
043 import com.thoughtworks.xstream.io.xml.XppReader;
044 import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
045
046 /**
047 * Frame translator implementation that uses XStream to convert messages to and
048 * from XML and JSON
049 *
050 * @author <a href="mailto:dejan@nighttale.net">Dejan Bosanac</a>
051 */
052 public class JmsFrameTranslator extends LegacyFrameTranslator implements
053 BrokerContextAware {
054
055 XStream xStream = null;
056 BrokerContext brokerContext;
057
058 public ActiveMQMessage convertFrame(ProtocolConverter converter,
059 StompFrame command) throws JMSException, ProtocolException {
060 Map<String, String> headers = command.getHeaders();
061 ActiveMQMessage msg;
062 String transformation = (String) headers.get(Stomp.Headers.TRANSFORMATION);
063 if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH) || transformation.equals(Stomp.Transformations.JMS_BYTE.toString())) {
064 msg = super.convertFrame(converter, command);
065 } else {
066 HierarchicalStreamReader in;
067
068 try {
069 String text = new String(command.getContent(), "UTF-8");
070 switch (Stomp.Transformations.getValue(transformation)) {
071 case JMS_OBJECT_XML:
072 in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
073 msg = createObjectMessage(in);
074 break;
075 case JMS_OBJECT_JSON:
076 in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
077 msg = createObjectMessage(in);
078 break;
079 case JMS_MAP_XML:
080 in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
081 msg = createMapMessage(in);
082 break;
083 case JMS_MAP_JSON:
084 in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
085 msg = createMapMessage(in);
086 break;
087 default:
088 throw new Exception("Unkown transformation: " + transformation);
089 }
090 } catch (Throwable e) {
091 command.getHeaders().put(Stomp.Headers.TRANSFORMATION_ERROR, e.getMessage());
092 msg = super.convertFrame(converter, command);
093 }
094 }
095 FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
096 return msg;
097 }
098
099 public StompFrame convertMessage(ProtocolConverter converter,
100 ActiveMQMessage message) throws IOException, JMSException {
101 if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
102 StompFrame command = new StompFrame();
103 command.setAction(Stomp.Responses.MESSAGE);
104 Map<String, String> headers = new HashMap<String, String>(25);
105 command.setHeaders(headers);
106
107 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
108 converter, message, command, this);
109
110 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
111 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_XML.toString());
112 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
113 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_OBJECT_JSON.toString());
114 }
115
116 ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
117 command.setContent(marshall(msg.getObject(),
118 headers.get(Stomp.Headers.TRANSFORMATION))
119 .getBytes("UTF-8"));
120 return command;
121
122 } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
123 StompFrame command = new StompFrame();
124 command.setAction(Stomp.Responses.MESSAGE);
125 Map<String, String> headers = new HashMap<String, String>(25);
126 command.setHeaders(headers);
127
128 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
129 converter, message, command, this);
130
131 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
132 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_XML.toString());
133 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
134 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_MAP_JSON.toString());
135 }
136
137 ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
138 command.setContent(marshall((Serializable)msg.getContentMap(),
139 headers.get(Stomp.Headers.TRANSFORMATION))
140 .getBytes("UTF-8"));
141 return command;
142 } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
143 AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
144
145 StompFrame command = new StompFrame();
146 command.setAction(Stomp.Responses.MESSAGE);
147 Map<String, String> headers = new HashMap<String, String>(25);
148 command.setHeaders(headers);
149
150 FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
151 converter, message, command, this);
152
153 if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_XML.toString())) {
154 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_XML.toString());
155 } else if (headers.get(Stomp.Headers.TRANSFORMATION).equals(Stomp.Transformations.JMS_JSON.toString())) {
156 headers.put(Stomp.Headers.TRANSFORMATION, Stomp.Transformations.JMS_ADVISORY_JSON.toString());
157 }
158
159 String body = marshallAdvisory(message.getDataStructure(),
160 headers.get(Stomp.Headers.TRANSFORMATION));
161 command.setContent(body.getBytes("UTF-8"));
162 return command;
163 } else {
164 return super.convertMessage(converter, message);
165 }
166 }
167
168 /**
169 * Marshalls the Object to a string using XML or JSON encoding
170 */
171 protected String marshall(Serializable object, String transformation)
172 throws JMSException {
173 StringWriter buffer = new StringWriter();
174 HierarchicalStreamWriter out;
175 if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) {
176 out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
177 } else {
178 out = new PrettyPrintWriter(buffer);
179 }
180 getXStream().marshal(object, out);
181 return buffer.toString();
182 }
183
184 protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
185 ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
186 Object obj = getXStream().unmarshal(in);
187 objMsg.setObject((Serializable) obj);
188 return objMsg;
189 }
190
191 @SuppressWarnings("unchecked")
192 protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
193 ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
194 Map<String, Object> map = (Map<String, Object>)getXStream().unmarshal(in);
195 for (String key : map.keySet()) {
196 mapMsg.setObject(key, map.get(key));
197 }
198 return mapMsg;
199 }
200
201 protected String marshallAdvisory(final DataStructure ds, String transformation) {
202
203 StringWriter buffer = new StringWriter();
204 HierarchicalStreamWriter out;
205 if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) {
206 out = new JettisonMappedXmlDriver().createWriter(buffer);
207 } else {
208 out = new PrettyPrintWriter(buffer);
209 }
210
211 XStream xstream = getXStream();
212 xstream.setMode(XStream.NO_REFERENCES);
213 xstream.aliasPackage("", "org.apache.activemq.command");
214 xstream.marshal(ds, out);
215 return buffer.toString();
216 }
217
218 // Properties
219 // -------------------------------------------------------------------------
220 public XStream getXStream() {
221 if (xStream == null) {
222 xStream = createXStream();
223 }
224 return xStream;
225 }
226
227 public void setXStream(XStream xStream) {
228 this.xStream = xStream;
229 }
230
231 // Implementation methods
232 // -------------------------------------------------------------------------
233 @SuppressWarnings("unchecked")
234 protected XStream createXStream() {
235 XStream xstream = null;
236 if (brokerContext != null) {
237 Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class);
238 for (XStream bean : beans.values()) {
239 if (bean != null) {
240 xstream = bean;
241 break;
242 }
243 }
244 }
245
246 if (xstream == null) {
247 xstream = new XStream();
248 }
249 return xstream;
250
251 }
252
253 public void setBrokerContext(BrokerContext brokerContext) {
254 this.brokerContext = brokerContext;
255 }
256
257 }