001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.stomp;
018    
019    import java.io.DataOutputStream;
020    import java.io.IOException;
021    import java.util.HashMap;
022    import java.util.Map;
023    
024    import javax.jms.Destination;
025    import javax.jms.JMSException;
026    
027    import org.apache.activemq.advisory.AdvisorySupport;
028    import org.apache.activemq.command.ActiveMQBytesMessage;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQMessage;
031    import org.apache.activemq.command.ActiveMQTextMessage;
032    import org.apache.activemq.command.DataStructure;
033    import org.apache.activemq.util.ByteArrayOutputStream;
034    import org.apache.activemq.util.ByteSequence;
035    
036    import com.thoughtworks.xstream.XStream;
037    import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
038    
039    /**
040     * Implements ActiveMQ 4.0 translations
041     */
042    public class LegacyFrameTranslator implements FrameTranslator {
043    
044    
045        public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
046            final Map<?, ?> headers = command.getHeaders();
047            final ActiveMQMessage msg;
048            /*
049             * To reduce the complexity of this method perhaps a Chain of Responsibility
050             * would be a better implementation
051             */
052            if (headers.containsKey(Stomp.Headers.AMQ_MESSAGE_TYPE)) {
053                String intendedType = (String)headers.get(Stomp.Headers.AMQ_MESSAGE_TYPE);
054                if(intendedType.equalsIgnoreCase("text")){
055                    ActiveMQTextMessage text = new ActiveMQTextMessage();
056                    try {
057                        ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
058                        DataOutputStream data = new DataOutputStream(bytes);
059                        data.writeInt(command.getContent().length);
060                        data.write(command.getContent());
061                        text.setContent(bytes.toByteSequence());
062                    } catch (Throwable e) {
063                        throw new ProtocolException("Text could not bet set: " + e, false, e);
064                    }
065                    msg = text;
066                } else if(intendedType.equalsIgnoreCase("bytes")) {
067                    ActiveMQBytesMessage byteMessage = new ActiveMQBytesMessage();
068                    byteMessage.writeBytes(command.getContent());
069                    msg = byteMessage;
070                } else {
071                    throw new ProtocolException("Unsupported message type '"+intendedType+"'",false);
072                }
073            }else if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
074                headers.remove(Stomp.Headers.CONTENT_LENGTH);
075                ActiveMQBytesMessage bm = new ActiveMQBytesMessage();
076                bm.writeBytes(command.getContent());
077                msg = bm;
078            } else {
079                ActiveMQTextMessage text = new ActiveMQTextMessage();
080                try {
081                    ByteArrayOutputStream bytes = new ByteArrayOutputStream(command.getContent().length + 4);
082                    DataOutputStream data = new DataOutputStream(bytes);
083                    data.writeInt(command.getContent().length);
084                    data.write(command.getContent());
085                    text.setContent(bytes.toByteSequence());
086                } catch (Throwable e) {
087                    throw new ProtocolException("Text could not bet set: " + e, false, e);
088                }
089                msg = text;
090            }
091            FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
092            return msg;
093        }
094    
095        public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException {
096            StompFrame command = new StompFrame();
097            command.setAction(Stomp.Responses.MESSAGE);
098            Map<String, String> headers = new HashMap<String, String>(25);
099            command.setHeaders(headers);
100    
101            FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this);
102    
103            if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
104    
105                if (!message.isCompressed() && message.getContent() != null) {
106                    ByteSequence msgContent = message.getContent();
107                    if (msgContent.getLength() > 4) {
108                        byte[] content = new byte[msgContent.getLength() - 4];
109                        System.arraycopy(msgContent.data, 4, content, 0, content.length);
110                        command.setContent(content);
111                    }
112                } else {
113                    ActiveMQTextMessage msg = (ActiveMQTextMessage)message.copy();
114                    String messageText = msg.getText();
115                    if (messageText != null) {
116                        command.setContent(msg.getText().getBytes("UTF-8"));
117                    }
118                }
119    
120            } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
121    
122                ActiveMQBytesMessage msg = (ActiveMQBytesMessage)message.copy();
123                msg.setReadOnlyBody(true);
124                byte[] data = new byte[(int)msg.getBodyLength()];
125                msg.readBytes(data);
126    
127                headers.put(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length));
128                command.setContent(data);
129            } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE &&
130                    AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
131    
132                FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(
133                        converter, message, command, this);
134    
135                String body = marshallAdvisory(message.getDataStructure());
136                command.setContent(body.getBytes("UTF-8"));
137            }
138            return command;
139        }
140    
141        public String convertDestination(ProtocolConverter converter, Destination d) {
142            if (d == null) {
143                return null;
144            }
145            ActiveMQDestination activeMQDestination = (ActiveMQDestination)d;
146            String physicalName = activeMQDestination.getPhysicalName();
147    
148            String rc = converter.getCreatedTempDestinationName(activeMQDestination);
149            if( rc!=null ) {
150                return rc;
151            }
152    
153            StringBuilder buffer = new StringBuilder();
154            if (activeMQDestination.isQueue()) {
155                if (activeMQDestination.isTemporary()) {
156                    buffer.append("/remote-temp-queue/");
157                } else {
158                    buffer.append("/queue/");
159                }
160            } else {
161                if (activeMQDestination.isTemporary()) {
162                    buffer.append("/remote-temp-topic/");
163                } else {
164                    buffer.append("/topic/");
165                }
166            }
167            buffer.append(physicalName);
168            return buffer.toString();
169        }
170    
171        public ActiveMQDestination convertDestination(ProtocolConverter converter, String name, boolean forceFallback) throws ProtocolException {
172            if (name == null) {
173                return null;
174            }
175    
176            // in case of space padding by a client we trim for the initial detection, on fallback use
177            // the un-trimmed value.
178            String originalName = name;
179            name = name.trim();
180    
181            if (name.startsWith("/queue/")) {
182                String qName = name.substring("/queue/".length(), name.length());
183                return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE);
184            } else if (name.startsWith("/topic/")) {
185                String tName = name.substring("/topic/".length(), name.length());
186                return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
187            } else if (name.startsWith("/remote-temp-queue/")) {
188                String tName = name.substring("/remote-temp-queue/".length(), name.length());
189                return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
190            } else if (name.startsWith("/remote-temp-topic/")) {
191                String tName = name.substring("/remote-temp-topic/".length(), name.length());
192                return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
193            } else if (name.startsWith("/temp-queue/")) {
194                return converter.createTempDestination(name, false);
195            } else if (name.startsWith("/temp-topic/")) {
196                return converter.createTempDestination(name, true);
197            } else {
198                if (forceFallback) {
199                    try {
200                        ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(originalName);
201                        if (fallback != null) {
202                            return fallback;
203                        }
204                    } catch (JMSException e) {
205                        throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations "
206                                + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e);
207                    }
208                }
209                throw new ProtocolException("Illegal destination name: [" + originalName + "] -- ActiveMQ STOMP destinations "
210                                            + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
211            }
212        }
213    
214        /**
215         * Return an Advisory message as a JSON formatted string
216         * @param ds
217         * @return
218         */
219        protected String marshallAdvisory(final DataStructure ds) {
220            XStream xstream = new XStream(new JsonHierarchicalStreamDriver());
221            xstream.setMode(XStream.NO_REFERENCES);
222            xstream.aliasPackage("", "org.apache.activemq.command");
223            return xstream.toXML(ds);
224        }
225    }