001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.DataOutputStream;
020    import java.io.IOException;
021    import java.util.HashMap;
022    import java.util.Map;
023    
024    import javax.jms.Destination;
025    import javax.jms.JMSException;
026    
027    import org.apache.activemq.advisory.AdvisorySupport;
028    import org.apache.activemq.command.ActiveMQBytesMessage;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQMessage;
031    import org.apache.activemq.command.ActiveMQTextMessage;
032    import org.apache.activemq.command.DataStructure;
033    import org.apache.activemq.util.ByteArrayOutputStream;
034    import org.apache.activemq.util.ByteSequence;
035    
036    import com.thoughtworks.xstream.XStream;
037    import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
038    
039    /**
040     * Implements ActiveMQ 4.0 translations
041     */
042    public class LegacyFrameTranslator implements FrameTranslator {
043    
044        public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
045            final Map<?, ?> headers = command.getHeaders();
046            final ActiveMQMessage msg;
047            /*
048             * To reduce the complexity of this method perhaps a Chain of Responsibility
049             * would be a better implementation
050             */
051            if (headers.containsKey(Stomp.Headers.AMQ_MESSAGE_TYPE)) {
052                String intendedType = (String)headers.get(Stomp.Headers.AMQ_MESSAGE_TYPE);
053                if(intendedType.equalsIgnoreCase("text")){
054                    ActiveMQTextMessage text = new ActiveMQTextMessage();
055                    try {
056                        ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
057                        DataOutputStream data = new DataOutputStream(bytes);
058                        data.writeInt(command.getContent().length);
059                        data.write(command.getContent());
060                        text.setContent(bytes.toByteSequence());
061                        data.close();
062                    } catch (Throwable e) {
063                        throw new ProtocolException("Text could not bet set: " + e, false, e);
064                    }
065                    msg = text;
066                } else if(intendedType.equalsIgnoreCase("bytes")) {
067                    ActiveMQBytesMessage byteMessage = new ActiveMQBytesMessage();
068                    byteMessage.writeBytes(command.getContent());
069                    msg = byteMessage;
070                } else {
071                    throw new ProtocolException("Unsupported message type '"+intendedType+"'",false);
072                }
073            }else if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
074                headers.remove(Stomp.Headers.CONTENT_LENGTH);
075                ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
076                bm.writeBytes(command.getContent());
077                msg = bm;
078            } else {
079                ActiveMQTextMessage text = new ActiveMQTextMessage();
080                try {
081                    ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
082                    DataOutputStream data = new DataOutputStream(bytes);
083                    data.writeInt(command.getContent().length);
084                    data.write(command.getContent());
085                    text.setContent(bytes.toByteSequence());
086                    data.close();
087                } catch (Throwable e) {
088                    throw new ProtocolException("Text could not bet set: " + e, false, e);
089                }
090                msg = text;
091            }
092            FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
093            return msg;
094        }
095    
096        public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException {
097            StompFrame command = new StompFrame();
098            command.setAction(Stomp.Responses.MESSAGE);
099            Map<String, String> headers = new HashMap<String, String>(25);
100            command.setHeaders(headers);
101    
102            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
103    
104            if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
105    
106                if (!message.isCompressed() && message.getContent() != null) {
107                    ByteSequence msgContent = message.getContent();
108                    if (msgContent.getLength() > 4) {
109                        byte[] content = new byte[msgContent.getLength() - 4];
110                        System.arraycopy(msgContent.data, 4, content, 0, content.length);
111                        command.setContent(content);
112                    }
113                } else {
114                    ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
115                    String messageText = msg.getText();
116                    if (messageText != null) {
117                        command.setContent(msg.getText().getBytes("UTF-8"));
118                    }
119                }
120    
121            } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
122    
123                ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
124                msg.setReadOnlyBody(true);
125                byte[] data = new byte[(int)msg.getBodyLength()];
126                msg.readBytes(data);
127    
128                headers.put(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length));
129                command.setContent(data);
130            } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
131                    AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
132    
133                FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
134                        converter, message, command, this);
135    
136                String body = marshallAdvisory(message.getDataStructure());
137                command.setContent(body.getBytes("UTF-8"));
138            }
139            return command;
140        }
141    
142        public String convertDestination(ProtocolConverter converter, Destination d) {
143            if (d == null) {
144                return null;
145            }
146            ActiveMQDestination activeMQDestination = (ActiveMQDestination)d;
147            String physicalName = activeMQDestination.getPhysicalName();
148    
149            String rc = converter.getCreatedTempDestinationName(activeMQDestination);
150            if( rc!=null ) {
151                return rc;
152            }
153    
154            StringBuilder buffer = new StringBuilder();
155            if (activeMQDestination.isQueue()) {
156                if (activeMQDestination.isTemporary()) {
157                    buffer.append("/remote-temp-queue/");
158                } else {
159                    buffer.append("/queue/");
160                }
161            } else {
162                if (activeMQDestination.isTemporary()) {
163                    buffer.append("/remote-temp-topic/");
164                } else {
165                    buffer.append("/topic/");
166                }
167            }
168            buffer.append(physicalName);
169            return buffer.toString();
170        }
171    
172        public ActiveMQDestination convertDestination(ProtocolConverter converter, String name, boolean forceFallback) throws ProtocolException {
173            if (name == null) {
174                return null;
175            }
176    
177            // in case of space padding by a client we trim for the initial detection, on fallback use
178            // the un-trimmed value.
179            String originalName = name;
180            name = name.trim();
181    
182            if (name.startsWith("/queue/")) {
183                String qName = name.substring("/queue/".length(), name.length());
184                return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE);
185            } else if (name.startsWith("/topic/")) {
186                String tName = name.substring("/topic/".length(), name.length());
187                return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
188            } else if (name.startsWith("/remote-temp-queue/")) {
189                String tName = name.substring("/remote-temp-queue/".length(), name.length());
190                return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
191            } else if (name.startsWith("/remote-temp-topic/")) {
192                String tName = name.substring("/remote-temp-topic/".length(), name.length());
193                return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
194            } else if (name.startsWith("/temp-queue/")) {
195                return converter.createTempDestination(name, false);
196            } else if (name.startsWith("/temp-topic/")) {
197                return converter.createTempDestination(name, true);
198            } else {
199                if (forceFallback) {
200                    try {
201                        ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(originalName);
202                        if (fallback != null) {
203                            return fallback;
204                        }
205                    } catch (JMSException e) {
206                        throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations "
207                                + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e);
208                    }
209                }
210                throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations "
211                                            + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
212            }
213        }
214    
215        /**
216         * Return an Advisory message as a JSON formatted string
217         * @param ds
218         * @return
219         */
220        protected String marshallAdvisory(final DataStructure ds) {
221            XStream xstream = new XStream(new JsonHierarchicalStreamDriver());
222            xstream.setMode(XStream.NO_REFERENCES);
223            xstream.aliasPackage("", "org.apache.activemq.command");
224            return xstream.toXML(ds);
225        }
226    }