001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.command;
019    
020    import java.io.DataInputStream;
021    import java.io.DataOutputStream;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.ObjectOutputStream;
025    import java.io.OutputStream;
026    import java.io.Serializable;
027    import java.util.zip.DeflaterOutputStream;
028    import java.util.zip.InflaterInputStream;
029    
030    import javax.jms.JMSException;
031    import javax.jms.ObjectMessage;
032    
033    import org.apache.activemq.ActiveMQConnection;
034    import org.apache.activemq.util.ByteArrayInputStream;
035    import org.apache.activemq.util.ByteArrayOutputStream;
036    import org.apache.activemq.util.ByteSequence;
037    import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
038    import org.apache.activemq.util.JMSExceptionSupport;
039    import org.apache.activemq.wireformat.WireFormat;
040    
041    /**
042     * An <CODE>ObjectMessage</CODE> object is used to send a message that
043     * contains a serializable object in the Java programming language ("Java
044     * object"). It inherits from the <CODE>Message</CODE> interface and adds a
045     * body containing a single reference to an object. Only
046     * <CODE>Serializable</CODE> Java objects can be used. <p/>
047     * <P>
048     * If a collection of Java objects must be sent, one of the
049     * <CODE>Collection</CODE> classes provided since JDK 1.2 can be used. <p/>
050     * <P>
051     * When a client receives an <CODE>ObjectMessage</CODE>, it is in read-only
052     * mode. If a client attempts to write to the message at this point, a
053     * <CODE>MessageNotWriteableException</CODE> is thrown. If
054     * <CODE>clearBody</CODE> is called, the message can now be both read from and
055     * written to.
056     *
057     * @openwire:marshaller code="26"
058     * @see javax.jms.Session#createObjectMessage()
059     * @see javax.jms.Session#createObjectMessage(Serializable)
060     * @see javax.jms.BytesMessage
061     * @see javax.jms.MapMessage
062     * @see javax.jms.Message
063     * @see javax.jms.StreamMessage
064     * @see javax.jms.TextMessage
065     */
066    public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage {
067    
068        // TODO: verify classloader
069        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_OBJECT_MESSAGE;
070        static final ClassLoader ACTIVEMQ_CLASSLOADER = ActiveMQObjectMessage.class.getClassLoader();
071    
072        protected transient Serializable object;
073    
074        public Message copy() {
075            ActiveMQObjectMessage copy = new ActiveMQObjectMessage();
076            copy(copy);
077            return copy;
078        }
079    
080        private void copy(ActiveMQObjectMessage copy) {
081            ActiveMQConnection connection = getConnection();
082            if (connection == null || !connection.isObjectMessageSerializationDefered()) {
083                storeContent();
084                copy.object = null;
085            } else {
086                copy.object = object;
087            }
088            super.copy(copy);
089    
090        }
091    
092        @Override
093        public void storeContent() {
094            ByteSequence bodyAsBytes = getContent();
095            if (bodyAsBytes == null && object != null) {
096                try {
097                    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
098                    OutputStream os = bytesOut;
099                    ActiveMQConnection connection = getConnection();
100                    if (connection != null && connection.isUseCompression()) {
101                        compressed = true;
102                        os = new DeflaterOutputStream(os);
103                    }
104                    DataOutputStream dataOut = new DataOutputStream(os);
105                    ObjectOutputStream objOut = new ObjectOutputStream(dataOut);
106                    objOut.writeObject(object);
107                    objOut.flush();
108                    objOut.reset();
109                    objOut.close();
110                    setContent(bytesOut.toByteSequence());
111                } catch (IOException ioe) {
112                    throw new RuntimeException(ioe.getMessage(), ioe);
113                }
114            }
115        }
116    
117        public byte getDataStructureType() {
118            return DATA_STRUCTURE_TYPE;
119        }
120    
121        public String getJMSXMimeType() {
122            return "jms/object-message";
123        }
124    
125        /**
126         * Clears out the message body. Clearing a message's body does not clear its
127         * header values or property entries. <p/>
128         * <P>
129         * If this message body was read-only, calling this method leaves the
130         * message body in the same state as an empty body in a newly created
131         * message.
132         *
133         * @throws JMSException if the JMS provider fails to clear the message body
134         *                 due to some internal error.
135         */
136    
137        public void clearBody() throws JMSException {
138            super.clearBody();
139            this.object = null;
140        }
141    
142        /**
143         * Sets the serializable object containing this message's data. It is
144         * important to note that an <CODE>ObjectMessage</CODE> contains a
145         * snapshot of the object at the time <CODE>setObject()</CODE> is called;
146         * subsequent modifications of the object will have no effect on the
147         * <CODE>ObjectMessage</CODE> body.
148         *
149         * @param newObject the message's data
150         * @throws JMSException if the JMS provider fails to set the object due to
151         *                 some internal error.
152         * @throws javax.jms.MessageFormatException if object serialization fails.
153         * @throws javax.jms.MessageNotWriteableException if the message is in
154         *                 read-only mode.
155         */
156    
157        public void setObject(Serializable newObject) throws JMSException {
158            checkReadOnlyBody();
159            this.object = newObject;
160            setContent(null);
161            ActiveMQConnection connection = getConnection();
162            if (connection == null || !connection.isObjectMessageSerializationDefered()) {
163                storeContent();
164            }
165        }
166    
167        /**
168         * Gets the serializable object containing this message's data. The default
169         * value is null.
170         *
171         * @return the serializable object containing this message's data
172         * @throws JMSException
173         */
174        public Serializable getObject() throws JMSException {
175            if (object == null && getContent() != null) {
176                try {
177                    ByteSequence content = getContent();
178                    InputStream is = new ByteArrayInputStream(content);
179                    if (isCompressed()) {
180                        is = new InflaterInputStream(is);
181                    }
182                    DataInputStream dataIn = new DataInputStream(is);
183                    ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
184                    try {
185                        object = (Serializable)objIn.readObject();
186                    } catch (ClassNotFoundException ce) {
187                        throw JMSExceptionSupport.create("Failed to build body from content. Serializable class not available to broker. Reason: " + ce, ce);
188                    } finally {
189                        dataIn.close();
190                    }
191                } catch (IOException e) {
192                    throw JMSExceptionSupport.create("Failed to build body from bytes. Reason: " + e, e);
193                }
194            }
195            return this.object;
196        }
197    
198        @Override
199        public void beforeMarshall(WireFormat wireFormat) throws IOException {
200            super.beforeMarshall(wireFormat);
201            // may have initiated on vm transport with deferred marshalling
202            storeContent();
203        }
204    
205        public void clearMarshalledState() throws JMSException {
206            super.clearMarshalledState();
207            this.object = null;
208        }
209    
210        public void onMessageRolledBack() {
211            super.onMessageRolledBack();
212    
213            // lets force the object to be deserialized again - as we could have
214            // changed the object
215            object = null;
216        }
217    
218        @Override
219        public void compress() throws IOException {
220            storeContent();
221            super.compress();
222        }
223    
224        public String toString() {
225            try {
226                getObject();
227            } catch (JMSException e) {
228            }
229            return super.toString();
230        }
231    }