001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.net.MalformedURLException;
020    import java.util.Enumeration;
021    
022    import javax.jms.BytesMessage;
023    import javax.jms.Destination;
024    import javax.jms.JMSException;
025    import javax.jms.MapMessage;
026    import javax.jms.Message;
027    import javax.jms.MessageEOFException;
028    import javax.jms.ObjectMessage;
029    import javax.jms.Queue;
030    import javax.jms.StreamMessage;
031    import javax.jms.TemporaryQueue;
032    import javax.jms.TemporaryTopic;
033    import javax.jms.TextMessage;
034    import javax.jms.Topic;
035    
036    import org.apache.activemq.blob.BlobDownloader;
037    import org.apache.activemq.blob.BlobUploader;
038    import org.apache.activemq.command.ActiveMQBlobMessage;
039    import org.apache.activemq.command.ActiveMQBytesMessage;
040    import org.apache.activemq.command.ActiveMQDestination;
041    import org.apache.activemq.command.ActiveMQMapMessage;
042    import org.apache.activemq.command.ActiveMQMessage;
043    import org.apache.activemq.command.ActiveMQObjectMessage;
044    import org.apache.activemq.command.ActiveMQQueue;
045    import org.apache.activemq.command.ActiveMQStreamMessage;
046    import org.apache.activemq.command.ActiveMQTempQueue;
047    import org.apache.activemq.command.ActiveMQTempTopic;
048    import org.apache.activemq.command.ActiveMQTextMessage;
049    import org.apache.activemq.command.ActiveMQTopic;
050    
051    /**
052     * A helper class for converting normal JMS interfaces into ActiveMQ specific
053     * ones.
054     * 
055     * 
056     */
057    public final class ActiveMQMessageTransformation {
058    
059        private ActiveMQMessageTransformation() {    
060        }
061        
062        /**
063         * Creates a an available JMS message from another provider.
064         * 
065         * @param destination - Destination to be converted into ActiveMQ's
066         *                implementation.
067         * @return ActiveMQDestination - ActiveMQ's implementation of the
068         *         destination.
069         * @throws JMSException if an error occurs
070         */
071        public static ActiveMQDestination transformDestination(Destination destination) throws JMSException {
072            ActiveMQDestination activeMQDestination = null;
073    
074            if (destination != null) {
075                if (destination instanceof ActiveMQDestination) {
076                    return (ActiveMQDestination)destination;
077    
078                } else {
079                    if (destination instanceof TemporaryQueue) {
080                        activeMQDestination = new ActiveMQTempQueue(((Queue)destination).getQueueName());
081                    } else if (destination instanceof TemporaryTopic) {
082                        activeMQDestination = new ActiveMQTempTopic(((Topic)destination).getTopicName());
083                    } else if (destination instanceof Queue) {
084                        activeMQDestination = new ActiveMQQueue(((Queue)destination).getQueueName());
085                    } else if (destination instanceof Topic) {
086                        activeMQDestination = new ActiveMQTopic(((Topic)destination).getTopicName());
087                    }
088                }
089            }
090    
091            return activeMQDestination;
092        }
093    
094        /**
095         * Creates a fast shallow copy of the current ActiveMQMessage or creates a
096         * whole new message instance from an available JMS message from another
097         * provider.
098         * 
099         * @param message - Message to be converted into ActiveMQ's implementation.
100         * @param connection
101         * @return ActiveMQMessage - ActiveMQ's implementation object of the
102         *         message.
103         * @throws JMSException if an error occurs
104         */
105        public static ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection)
106            throws JMSException {
107            if (message instanceof ActiveMQMessage) {
108                return (ActiveMQMessage)message;
109    
110            } else {
111                ActiveMQMessage activeMessage = null;
112    
113                if (message instanceof BytesMessage) {
114                    BytesMessage bytesMsg = (BytesMessage)message;
115                    bytesMsg.reset();
116                    ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
117                    msg.setConnection(connection);
118                    try {
119                        for (;;) {
120                            // Reads a byte from the message stream until the stream
121                            // is empty
122                            msg.writeByte(bytesMsg.readByte());
123                        }
124                    } catch (MessageEOFException e) {
125                        // if an end of message stream as expected
126                    } catch (JMSException e) {
127                    }
128    
129                    activeMessage = msg;
130                } else if (message instanceof MapMessage) {
131                    MapMessage mapMsg = (MapMessage)message;
132                    ActiveMQMapMessage msg = new ActiveMQMapMessage();
133                    msg.setConnection(connection);
134                    Enumeration iter = mapMsg.getMapNames();
135    
136                    while (iter.hasMoreElements()) {
137                        String name = iter.nextElement().toString();
138                        msg.setObject(name, mapMsg.getObject(name));
139                    }
140    
141                    activeMessage = msg;
142                } else if (message instanceof ObjectMessage) {
143                    ObjectMessage objMsg = (ObjectMessage)message;
144                    ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
145                    msg.setConnection(connection);
146                    msg.setObject(objMsg.getObject());
147                    msg.storeContent();
148                    activeMessage = msg;
149                } else if (message instanceof StreamMessage) {
150                    StreamMessage streamMessage = (StreamMessage)message;
151                    streamMessage.reset();
152                    ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
153                    msg.setConnection(connection);
154                    Object obj = null;
155    
156                    try {
157                        while ((obj = streamMessage.readObject()) != null) {
158                            msg.writeObject(obj);
159                        }
160                    } catch (MessageEOFException e) {
161                        // if an end of message stream as expected
162                    } catch (JMSException e) {
163                    }
164    
165                    activeMessage = msg;
166                } else if (message instanceof TextMessage) {
167                    TextMessage textMsg = (TextMessage)message;
168                    ActiveMQTextMessage msg = new ActiveMQTextMessage();
169                    msg.setConnection(connection);
170                    msg.setText(textMsg.getText());
171                    activeMessage = msg;
172                } else if (message instanceof BlobMessage) {
173                    BlobMessage blobMessage = (BlobMessage)message;
174                    ActiveMQBlobMessage msg = new ActiveMQBlobMessage();
175                    msg.setConnection(connection);
176                    msg.setBlobDownloader(new BlobDownloader(connection.getBlobTransferPolicy()));
177                    try {
178                                            msg.setURL(blobMessage.getURL());
179                                    } catch (MalformedURLException e) {
180                                            
181                                    }
182                    activeMessage = msg;
183                } else {
184                    activeMessage = new ActiveMQMessage();
185                    activeMessage.setConnection(connection);
186                }
187    
188                copyProperties(message, activeMessage);
189    
190                return activeMessage;
191            }
192        }
193    
194        /**
195         * Copies the standard JMS and user defined properties from the givem
196         * message to the specified message
197         * 
198         * @param fromMessage the message to take the properties from
199         * @param toMessage the message to add the properties to
200         * @throws JMSException
201         */
202        public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException {
203            toMessage.setJMSMessageID(fromMessage.getJMSMessageID());
204            toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID());
205            toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
206            toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination()));
207            toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode());
208            toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered());
209            toMessage.setJMSType(fromMessage.getJMSType());
210            toMessage.setJMSExpiration(fromMessage.getJMSExpiration());
211            toMessage.setJMSPriority(fromMessage.getJMSPriority());
212            toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp());
213    
214            Enumeration propertyNames = fromMessage.getPropertyNames();
215    
216            while (propertyNames.hasMoreElements()) {
217                String name = propertyNames.nextElement().toString();
218                Object obj = fromMessage.getObjectProperty(name);
219                toMessage.setObjectProperty(name, obj);
220            }
221        }
222    }