001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.command;
019    
020    import java.io.BufferedInputStream;
021    import java.io.DataInputStream;
022    import java.io.DataOutputStream;
023    import java.io.EOFException;
024    import java.io.IOException;
025    import java.io.InputStream;
026    import java.io.OutputStream;
027    import java.util.zip.DeflaterOutputStream;
028    import java.util.zip.InflaterInputStream;
029    
030    import javax.jms.JMSException;
031    import javax.jms.MessageEOFException;
032    import javax.jms.MessageFormatException;
033    import javax.jms.MessageNotReadableException;
034    import javax.jms.MessageNotWriteableException;
035    import javax.jms.StreamMessage;
036    
037    import org.apache.activemq.ActiveMQConnection;
038    import org.apache.activemq.util.ByteArrayInputStream;
039    import org.apache.activemq.util.ByteArrayOutputStream;
040    import org.apache.activemq.util.ByteSequence;
041    import org.apache.activemq.util.JMSExceptionSupport;
042    import org.apache.activemq.util.MarshallingSupport;
043    
044    /**
045     * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive
046     * types in the Java programming language. It is filled and read sequentially.
047     * It inherits from the <CODE>Message</CODE> interface and adds a stream
048     * message body. Its methods are based largely on those found in
049     * <CODE>java.io.DataInputStream</CODE> and
050     * <CODE>java.io.DataOutputStream</CODE>. <p/>
051     * <P>
052     * The primitive types can be read or written explicitly using methods for each
053     * type. They may also be read or written generically as objects. For instance,
054     * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
055     * <CODE>StreamMessage.writeObject(new
056     * Integer(6))</CODE>. Both forms are
057     * provided, because the explicit form is convenient for static programming, and
058     * the object form is needed when types are not known at compile time. <p/>
059     * <P>
060     * When the message is first created, and when <CODE>clearBody</CODE> is
061     * called, the body of the message is in write-only mode. After the first call
062     * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
063     * After a message has been sent, the client that sent it can retain and modify
064     * it without affecting the message that has been sent. The same message object
065     * can be sent multiple times. When a message has been received, the provider
066     * has called <CODE>reset</CODE> so that the message body is in read-only mode
067     * for the client. <p/>
068     * <P>
069     * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
070     * message body is cleared and the message body is in write-only mode. <p/>
071     * <P>
072     * If a client attempts to read a message in write-only mode, a
073     * <CODE>MessageNotReadableException</CODE> is thrown. <p/>
074     * <P>
075     * If a client attempts to write a message in read-only mode, a
076     * <CODE>MessageNotWriteableException</CODE> is thrown. <p/>
077     * <P>
078     * <CODE>StreamMessage</CODE> objects support the following conversion table.
079     * The marked cases must be supported. The unmarked cases must throw a
080     * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive
081     * conversions may throw a runtime exception if the primitive's
082     * <CODE>valueOf()</CODE> method does not accept it as a valid
083     * <CODE>String</CODE> representation of the primitive. <p/>
084     * <P>
085     * A value written as the row type can be read as the column type. <p/>
086     *
087     * <PRE>
088     *  | | boolean byte short char int long float double String byte[]
089     * |----------------------------------------------------------------------
090     * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
091     * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
092     * X |----------------------------------------------------------------------
093     *
094     * </PRE>
095     *
096     * <p/>
097     * <P>
098     * Attempting to read a null value as a primitive type must be treated as
099     * calling the primitive's corresponding <code>valueOf(String)</code>
100     * conversion method with a null value. Since <code>char</code> does not
101     * support a <code>String</code> conversion, attempting to read a null value
102     * as a <code>char</code> must throw a <code>NullPointerException</code>.
103     *
104     * @openwire:marshaller code="27"
105     * @see javax.jms.Session#createStreamMessage()
106     * @see javax.jms.BytesMessage
107     * @see javax.jms.MapMessage
108     * @see javax.jms.Message
109     * @see javax.jms.ObjectMessage
110     * @see javax.jms.TextMessage
111     */
112    public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage {
113    
114        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE;
115    
116        protected transient DataOutputStream dataOut;
117        protected transient ByteArrayOutputStream bytesOut;
118        protected transient DataInputStream dataIn;
119        protected transient int remainingBytes = -1;
120    
121        public Message copy() {
122            ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
123            copy(copy);
124            return copy;
125        }
126    
127        private void copy(ActiveMQStreamMessage copy) {
128            storeContent();
129            super.copy(copy);
130            copy.dataOut = null;
131            copy.bytesOut = null;
132            copy.dataIn = null;
133        }
134    
135        public void onSend() throws JMSException {
136            super.onSend();
137            storeContent();
138        }
139    
140        @Override
141        public void storeContent() {
142            if (dataOut != null) {
143                try {
144                    dataOut.close();
145                    setContent(bytesOut.toByteSequence());
146                    bytesOut = null;
147                    dataOut = null;
148                } catch (IOException ioe) {
149                    throw new RuntimeException(ioe);
150                }
151            }
152        }
153    
154        public byte getDataStructureType() {
155            return DATA_STRUCTURE_TYPE;
156        }
157    
158        public String getJMSXMimeType() {
159            return "jms/stream-message";
160        }
161    
162        /**
163         * Clears out the message body. Clearing a message's body does not clear its
164         * header values or property entries. <p/>
165         * <P>
166         * If this message body was read-only, calling this method leaves the
167         * message body in the same state as an empty body in a newly created
168         * message.
169         *
170         * @throws JMSException if the JMS provider fails to clear the message body
171         *                 due to some internal error.
172         */
173    
174        public void clearBody() throws JMSException {
175            super.clearBody();
176            this.dataOut = null;
177            this.dataIn = null;
178            this.bytesOut = null;
179            this.remainingBytes = -1;
180        }
181    
182        /**
183         * Reads a <code>boolean</code> from the stream message.
184         *
185         * @return the <code>boolean</code> value read
186         * @throws JMSException if the JMS provider fails to read the message due to
187         *                 some internal error.
188         * @throws MessageEOFException if unexpected end of message stream has been
189         *                 reached.
190         * @throws MessageFormatException if this type conversion is invalid.
191         * @throws MessageNotReadableException if the message is in write-only mode.
192         */
193    
194        public boolean readBoolean() throws JMSException {
195            initializeReading();
196            try {
197    
198                this.dataIn.mark(10);
199                int type = this.dataIn.read();
200                if (type == -1) {
201                    throw new MessageEOFException("reached end of data");
202                }
203                if (type == MarshallingSupport.BOOLEAN_TYPE) {
204                    return this.dataIn.readBoolean();
205                }
206                if (type == MarshallingSupport.STRING_TYPE) {
207                    return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
208                }
209                if (type == MarshallingSupport.NULL) {
210                    this.dataIn.reset();
211                    throw new NullPointerException("Cannot convert NULL value to boolean.");
212                } else {
213                    this.dataIn.reset();
214                    throw new MessageFormatException(" not a boolean type");
215                }
216            } catch (EOFException e) {
217                throw JMSExceptionSupport.createMessageEOFException(e);
218            } catch (IOException e) {
219                throw JMSExceptionSupport.createMessageFormatException(e);
220            }
221        }
222    
223        /**
224         * Reads a <code>byte</code> value from the stream message.
225         *
226         * @return the next byte from the stream message as a 8-bit
227         *         <code>byte</code>
228         * @throws JMSException if the JMS provider fails to read the message due to
229         *                 some internal error.
230         * @throws MessageEOFException if unexpected end of message stream has been
231         *                 reached.
232         * @throws MessageFormatException if this type conversion is invalid.
233         * @throws MessageNotReadableException if the message is in write-only mode.
234         */
235    
236        public byte readByte() throws JMSException {
237            initializeReading();
238            try {
239    
240                this.dataIn.mark(10);
241                int type = this.dataIn.read();
242                if (type == -1) {
243                    throw new MessageEOFException("reached end of data");
244                }
245                if (type == MarshallingSupport.BYTE_TYPE) {
246                    return this.dataIn.readByte();
247                }
248                if (type == MarshallingSupport.STRING_TYPE) {
249                    return Byte.valueOf(this.dataIn.readUTF()).byteValue();
250                }
251                if (type == MarshallingSupport.NULL) {
252                    this.dataIn.reset();
253                    throw new NullPointerException("Cannot convert NULL value to byte.");
254                } else {
255                    this.dataIn.reset();
256                    throw new MessageFormatException(" not a byte type");
257                }
258            } catch (NumberFormatException mfe) {
259                try {
260                    this.dataIn.reset();
261                } catch (IOException ioe) {
262                    throw JMSExceptionSupport.create(ioe);
263                }
264                throw mfe;
265    
266            } catch (EOFException e) {
267                throw JMSExceptionSupport.createMessageEOFException(e);
268            } catch (IOException e) {
269                throw JMSExceptionSupport.createMessageFormatException(e);
270            }
271        }
272    
273        /**
274         * Reads a 16-bit integer from the stream message.
275         *
276         * @return a 16-bit integer from the stream message
277         * @throws JMSException if the JMS provider fails to read the message due to
278         *                 some internal error.
279         * @throws MessageEOFException if unexpected end of message stream has been
280         *                 reached.
281         * @throws MessageFormatException if this type conversion is invalid.
282         * @throws MessageNotReadableException if the message is in write-only mode.
283         */
284    
285        public short readShort() throws JMSException {
286            initializeReading();
287            try {
288    
289                this.dataIn.mark(17);
290                int type = this.dataIn.read();
291                if (type == -1) {
292                    throw new MessageEOFException("reached end of data");
293                }
294                if (type == MarshallingSupport.SHORT_TYPE) {
295                    return this.dataIn.readShort();
296                }
297                if (type == MarshallingSupport.BYTE_TYPE) {
298                    return this.dataIn.readByte();
299                }
300                if (type == MarshallingSupport.STRING_TYPE) {
301                    return Short.valueOf(this.dataIn.readUTF()).shortValue();
302                }
303                if (type == MarshallingSupport.NULL) {
304                    this.dataIn.reset();
305                    throw new NullPointerException("Cannot convert NULL value to short.");
306                } else {
307                    this.dataIn.reset();
308                    throw new MessageFormatException(" not a short type");
309                }
310            } catch (NumberFormatException mfe) {
311                try {
312                    this.dataIn.reset();
313                } catch (IOException ioe) {
314                    throw JMSExceptionSupport.create(ioe);
315                }
316                throw mfe;
317    
318            } catch (EOFException e) {
319                throw JMSExceptionSupport.createMessageEOFException(e);
320            } catch (IOException e) {
321                throw JMSExceptionSupport.createMessageFormatException(e);
322            }
323    
324        }
325    
326        /**
327         * Reads a Unicode character value from the stream message.
328         *
329         * @return a Unicode character from the stream message
330         * @throws JMSException if the JMS provider fails to read the message due to
331         *                 some internal error.
332         * @throws MessageEOFException if unexpected end of message stream has been
333         *                 reached.
334         * @throws MessageFormatException if this type conversion is invalid
335         * @throws MessageNotReadableException if the message is in write-only mode.
336         */
337    
338        public char readChar() throws JMSException {
339            initializeReading();
340            try {
341    
342                this.dataIn.mark(17);
343                int type = this.dataIn.read();
344                if (type == -1) {
345                    throw new MessageEOFException("reached end of data");
346                }
347                if (type == MarshallingSupport.CHAR_TYPE) {
348                    return this.dataIn.readChar();
349                }
350                if (type == MarshallingSupport.NULL) {
351                    this.dataIn.reset();
352                    throw new NullPointerException("Cannot convert NULL value to char.");
353                } else {
354                    this.dataIn.reset();
355                    throw new MessageFormatException(" not a char type");
356                }
357            } catch (NumberFormatException mfe) {
358                try {
359                    this.dataIn.reset();
360                } catch (IOException ioe) {
361                    throw JMSExceptionSupport.create(ioe);
362                }
363                throw mfe;
364    
365            } catch (EOFException e) {
366                throw JMSExceptionSupport.createMessageEOFException(e);
367            } catch (IOException e) {
368                throw JMSExceptionSupport.createMessageFormatException(e);
369            }
370        }
371    
372        /**
373         * Reads a 32-bit integer from the stream message.
374         *
375         * @return a 32-bit integer value from the stream message, interpreted as an
376         *         <code>int</code>
377         * @throws JMSException if the JMS provider fails to read the message due to
378         *                 some internal error.
379         * @throws MessageEOFException if unexpected end of message stream has been
380         *                 reached.
381         * @throws MessageFormatException if this type conversion is invalid.
382         * @throws MessageNotReadableException if the message is in write-only mode.
383         */
384    
385        public int readInt() throws JMSException {
386            initializeReading();
387            try {
388    
389                this.dataIn.mark(33);
390                int type = this.dataIn.read();
391                if (type == -1) {
392                    throw new MessageEOFException("reached end of data");
393                }
394                if (type == MarshallingSupport.INTEGER_TYPE) {
395                    return this.dataIn.readInt();
396                }
397                if (type == MarshallingSupport.SHORT_TYPE) {
398                    return this.dataIn.readShort();
399                }
400                if (type == MarshallingSupport.BYTE_TYPE) {
401                    return this.dataIn.readByte();
402                }
403                if (type == MarshallingSupport.STRING_TYPE) {
404                    return Integer.valueOf(this.dataIn.readUTF()).intValue();
405                }
406                if (type == MarshallingSupport.NULL) {
407                    this.dataIn.reset();
408                    throw new NullPointerException("Cannot convert NULL value to int.");
409                } else {
410                    this.dataIn.reset();
411                    throw new MessageFormatException(" not an int type");
412                }
413            } catch (NumberFormatException mfe) {
414                try {
415                    this.dataIn.reset();
416                } catch (IOException ioe) {
417                    throw JMSExceptionSupport.create(ioe);
418                }
419                throw mfe;
420    
421            } catch (EOFException e) {
422                throw JMSExceptionSupport.createMessageEOFException(e);
423            } catch (IOException e) {
424                throw JMSExceptionSupport.createMessageFormatException(e);
425            }
426        }
427    
428        /**
429         * Reads a 64-bit integer from the stream message.
430         *
431         * @return a 64-bit integer value from the stream message, interpreted as a
432         *         <code>long</code>
433         * @throws JMSException if the JMS provider fails to read the message due to
434         *                 some internal error.
435         * @throws MessageEOFException if unexpected end of message stream has been
436         *                 reached.
437         * @throws MessageFormatException if this type conversion is invalid.
438         * @throws MessageNotReadableException if the message is in write-only mode.
439         */
440    
441        public long readLong() throws JMSException {
442            initializeReading();
443            try {
444    
445                this.dataIn.mark(65);
446                int type = this.dataIn.read();
447                if (type == -1) {
448                    throw new MessageEOFException("reached end of data");
449                }
450                if (type == MarshallingSupport.LONG_TYPE) {
451                    return this.dataIn.readLong();
452                }
453                if (type == MarshallingSupport.INTEGER_TYPE) {
454                    return this.dataIn.readInt();
455                }
456                if (type == MarshallingSupport.SHORT_TYPE) {
457                    return this.dataIn.readShort();
458                }
459                if (type == MarshallingSupport.BYTE_TYPE) {
460                    return this.dataIn.readByte();
461                }
462                if (type == MarshallingSupport.STRING_TYPE) {
463                    return Long.valueOf(this.dataIn.readUTF()).longValue();
464                }
465                if (type == MarshallingSupport.NULL) {
466                    this.dataIn.reset();
467                    throw new NullPointerException("Cannot convert NULL value to long.");
468                } else {
469                    this.dataIn.reset();
470                    throw new MessageFormatException(" not a long type");
471                }
472            } catch (NumberFormatException mfe) {
473                try {
474                    this.dataIn.reset();
475                } catch (IOException ioe) {
476                    throw JMSExceptionSupport.create(ioe);
477                }
478                throw mfe;
479    
480            } catch (EOFException e) {
481                throw JMSExceptionSupport.createMessageEOFException(e);
482            } catch (IOException e) {
483                throw JMSExceptionSupport.createMessageFormatException(e);
484            }
485        }
486    
487        /**
488         * Reads a <code>float</code> from the stream message.
489         *
490         * @return a <code>float</code> value from the stream message
491         * @throws JMSException if the JMS provider fails to read the message due to
492         *                 some internal error.
493         * @throws MessageEOFException if unexpected end of message stream has been
494         *                 reached.
495         * @throws MessageFormatException if this type conversion is invalid.
496         * @throws MessageNotReadableException if the message is in write-only mode.
497         */
498    
499        public float readFloat() throws JMSException {
500            initializeReading();
501            try {
502                this.dataIn.mark(33);
503                int type = this.dataIn.read();
504                if (type == -1) {
505                    throw new MessageEOFException("reached end of data");
506                }
507                if (type == MarshallingSupport.FLOAT_TYPE) {
508                    return this.dataIn.readFloat();
509                }
510                if (type == MarshallingSupport.STRING_TYPE) {
511                    return Float.valueOf(this.dataIn.readUTF()).floatValue();
512                }
513                if (type == MarshallingSupport.NULL) {
514                    this.dataIn.reset();
515                    throw new NullPointerException("Cannot convert NULL value to float.");
516                } else {
517                    this.dataIn.reset();
518                    throw new MessageFormatException(" not a float type");
519                }
520            } catch (NumberFormatException mfe) {
521                try {
522                    this.dataIn.reset();
523                } catch (IOException ioe) {
524                    throw JMSExceptionSupport.create(ioe);
525                }
526                throw mfe;
527    
528            } catch (EOFException e) {
529                throw JMSExceptionSupport.createMessageEOFException(e);
530            } catch (IOException e) {
531                throw JMSExceptionSupport.createMessageFormatException(e);
532            }
533        }
534    
535        /**
536         * Reads a <code>double</code> from the stream message.
537         *
538         * @return a <code>double</code> value from the stream message
539         * @throws JMSException if the JMS provider fails to read the message due to
540         *                 some internal error.
541         * @throws MessageEOFException if unexpected end of message stream has been
542         *                 reached.
543         * @throws MessageFormatException if this type conversion is invalid.
544         * @throws MessageNotReadableException if the message is in write-only mode.
545         */
546    
547        public double readDouble() throws JMSException {
548            initializeReading();
549            try {
550    
551                this.dataIn.mark(65);
552                int type = this.dataIn.read();
553                if (type == -1) {
554                    throw new MessageEOFException("reached end of data");
555                }
556                if (type == MarshallingSupport.DOUBLE_TYPE) {
557                    return this.dataIn.readDouble();
558                }
559                if (type == MarshallingSupport.FLOAT_TYPE) {
560                    return this.dataIn.readFloat();
561                }
562                if (type == MarshallingSupport.STRING_TYPE) {
563                    return Double.valueOf(this.dataIn.readUTF()).doubleValue();
564                }
565                if (type == MarshallingSupport.NULL) {
566                    this.dataIn.reset();
567                    throw new NullPointerException("Cannot convert NULL value to double.");
568                } else {
569                    this.dataIn.reset();
570                    throw new MessageFormatException(" not a double type");
571                }
572            } catch (NumberFormatException mfe) {
573                try {
574                    this.dataIn.reset();
575                } catch (IOException ioe) {
576                    throw JMSExceptionSupport.create(ioe);
577                }
578                throw mfe;
579    
580            } catch (EOFException e) {
581                throw JMSExceptionSupport.createMessageEOFException(e);
582            } catch (IOException e) {
583                throw JMSExceptionSupport.createMessageFormatException(e);
584            }
585        }
586    
587        /**
588         * Reads a <CODE>String</CODE> from the stream message.
589         *
590         * @return a Unicode string from the stream message
591         * @throws JMSException if the JMS provider fails to read the message due to
592         *                 some internal error.
593         * @throws MessageEOFException if unexpected end of message stream has been
594         *                 reached.
595         * @throws MessageFormatException if this type conversion is invalid.
596         * @throws MessageNotReadableException if the message is in write-only mode.
597         */
598    
599        public String readString() throws JMSException {
600            initializeReading();
601            try {
602    
603                this.dataIn.mark(65);
604                int type = this.dataIn.read();
605                if (type == -1) {
606                    throw new MessageEOFException("reached end of data");
607                }
608                if (type == MarshallingSupport.NULL) {
609                    return null;
610                }
611                if (type == MarshallingSupport.BIG_STRING_TYPE) {
612                    return MarshallingSupport.readUTF8(dataIn);
613                }
614                if (type == MarshallingSupport.STRING_TYPE) {
615                    return this.dataIn.readUTF();
616                }
617                if (type == MarshallingSupport.LONG_TYPE) {
618                    return new Long(this.dataIn.readLong()).toString();
619                }
620                if (type == MarshallingSupport.INTEGER_TYPE) {
621                    return new Integer(this.dataIn.readInt()).toString();
622                }
623                if (type == MarshallingSupport.SHORT_TYPE) {
624                    return new Short(this.dataIn.readShort()).toString();
625                }
626                if (type == MarshallingSupport.BYTE_TYPE) {
627                    return new Byte(this.dataIn.readByte()).toString();
628                }
629                if (type == MarshallingSupport.FLOAT_TYPE) {
630                    return new Float(this.dataIn.readFloat()).toString();
631                }
632                if (type == MarshallingSupport.DOUBLE_TYPE) {
633                    return new Double(this.dataIn.readDouble()).toString();
634                }
635                if (type == MarshallingSupport.BOOLEAN_TYPE) {
636                    return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
637                }
638                if (type == MarshallingSupport.CHAR_TYPE) {
639                    return new Character(this.dataIn.readChar()).toString();
640                } else {
641                    this.dataIn.reset();
642                    throw new MessageFormatException(" not a String type");
643                }
644            } catch (NumberFormatException mfe) {
645                try {
646                    this.dataIn.reset();
647                } catch (IOException ioe) {
648                    throw JMSExceptionSupport.create(ioe);
649                }
650                throw mfe;
651    
652            } catch (EOFException e) {
653                throw JMSExceptionSupport.createMessageEOFException(e);
654            } catch (IOException e) {
655                throw JMSExceptionSupport.createMessageFormatException(e);
656            }
657        }
658    
659        /**
660         * Reads a byte array field from the stream message into the specified
661         * <CODE>byte[]</CODE> object (the read buffer). <p/>
662         * <P>
663         * To read the field value, <CODE>readBytes</CODE> should be successively
664         * called until it returns a value less than the length of the read buffer.
665         * The value of the bytes in the buffer following the last byte read is
666         * undefined. <p/>
667         * <P>
668         * If <CODE>readBytes</CODE> returns a value equal to the length of the
669         * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there
670         * are no more bytes to be read, this call returns -1. <p/>
671         * <P>
672         * If the byte array field value is null, <CODE>readBytes</CODE> returns
673         * -1. <p/>
674         * <P>
675         * If the byte array field value is empty, <CODE>readBytes</CODE> returns
676         * 0. <p/>
677         * <P>
678         * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE>
679         * field value has been made, the full value of the field must be read
680         * before it is valid to read the next field. An attempt to read the next
681         * field before that has been done will throw a
682         * <CODE>MessageFormatException</CODE>. <p/>
683         * <P>
684         * To read the byte field value into a new <CODE>byte[]</CODE> object, use
685         * the <CODE>readObject</CODE> method.
686         *
687         * @param value the buffer into which the data is read
688         * @return the total number of bytes read into the buffer, or -1 if there is
689         *         no more data because the end of the byte field has been reached
690         * @throws JMSException if the JMS provider fails to read the message due to
691         *                 some internal error.
692         * @throws MessageEOFException if unexpected end of message stream has been
693         *                 reached.
694         * @throws MessageFormatException if this type conversion is invalid.
695         * @throws MessageNotReadableException if the message is in write-only mode.
696         * @see #readObject()
697         */
698    
699        public int readBytes(byte[] value) throws JMSException {
700    
701            initializeReading();
702            try {
703                if (value == null) {
704                    throw new NullPointerException();
705                }
706    
707                if (remainingBytes == -1) {
708                    this.dataIn.mark(value.length + 1);
709                    int type = this.dataIn.read();
710                    if (type == -1) {
711                        throw new MessageEOFException("reached end of data");
712                    }
713                    if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
714                        throw new MessageFormatException("Not a byte array");
715                    }
716                    remainingBytes = this.dataIn.readInt();
717                } else if (remainingBytes == 0) {
718                    remainingBytes = -1;
719                    return -1;
720                }
721    
722                if (value.length <= remainingBytes) {
723                    // small buffer
724                    remainingBytes -= value.length;
725                    this.dataIn.readFully(value);
726                    return value.length;
727                } else {
728                    // big buffer
729                    int rc = this.dataIn.read(value, 0, remainingBytes);
730                    remainingBytes = 0;
731                    return rc;
732                }
733    
734            } catch (EOFException e) {
735                JMSException jmsEx = new MessageEOFException(e.getMessage());
736                jmsEx.setLinkedException(e);
737                throw jmsEx;
738            } catch (IOException e) {
739                JMSException jmsEx = new MessageFormatException(e.getMessage());
740                jmsEx.setLinkedException(e);
741                throw jmsEx;
742            }
743        }
744    
745        /**
746         * Reads an object from the stream message. <p/>
747         * <P>
748         * This method can be used to return, in objectified format, an object in
749         * the Java programming language ("Java object") that has been written to
750         * the stream with the equivalent <CODE>writeObject</CODE> method call, or
751         * its equivalent primitive <CODE>write<I>type</I></CODE> method. <p/>
752         * <P>
753         * Note that byte values are returned as <CODE>byte[]</CODE>, not
754         * <CODE>Byte[]</CODE>. <p/>
755         * <P>
756         * An attempt to call <CODE>readObject</CODE> to read a byte field value
757         * into a new <CODE>byte[]</CODE> object before the full value of the byte
758         * field has been read will throw a <CODE>MessageFormatException</CODE>.
759         *
760         * @return a Java object from the stream message, in objectified format (for
761         *         example, if the object was written as an <CODE>int</CODE>, an
762         *         <CODE>Integer</CODE> is returned)
763         * @throws JMSException if the JMS provider fails to read the message due to
764         *                 some internal error.
765         * @throws MessageEOFException if unexpected end of message stream has been
766         *                 reached.
767         * @throws MessageFormatException if this type conversion is invalid.
768         * @throws MessageNotReadableException if the message is in write-only mode.
769         * @see #readBytes(byte[] value)
770         */
771    
772        public Object readObject() throws JMSException {
773            initializeReading();
774            try {
775                this.dataIn.mark(65);
776                int type = this.dataIn.read();
777                if (type == -1) {
778                    throw new MessageEOFException("reached end of data");
779                }
780                if (type == MarshallingSupport.NULL) {
781                    return null;
782                }
783                if (type == MarshallingSupport.BIG_STRING_TYPE) {
784                    return MarshallingSupport.readUTF8(dataIn);
785                }
786                if (type == MarshallingSupport.STRING_TYPE) {
787                    return this.dataIn.readUTF();
788                }
789                if (type == MarshallingSupport.LONG_TYPE) {
790                    return Long.valueOf(this.dataIn.readLong());
791                }
792                if (type == MarshallingSupport.INTEGER_TYPE) {
793                    return Integer.valueOf(this.dataIn.readInt());
794                }
795                if (type == MarshallingSupport.SHORT_TYPE) {
796                    return Short.valueOf(this.dataIn.readShort());
797                }
798                if (type == MarshallingSupport.BYTE_TYPE) {
799                    return Byte.valueOf(this.dataIn.readByte());
800                }
801                if (type == MarshallingSupport.FLOAT_TYPE) {
802                    return new Float(this.dataIn.readFloat());
803                }
804                if (type == MarshallingSupport.DOUBLE_TYPE) {
805                    return new Double(this.dataIn.readDouble());
806                }
807                if (type == MarshallingSupport.BOOLEAN_TYPE) {
808                    return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
809                }
810                if (type == MarshallingSupport.CHAR_TYPE) {
811                    return Character.valueOf(this.dataIn.readChar());
812                }
813                if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
814                    int len = this.dataIn.readInt();
815                    byte[] value = new byte[len];
816                    this.dataIn.readFully(value);
817                    return value;
818                } else {
819                    this.dataIn.reset();
820                    throw new MessageFormatException("unknown type");
821                }
822            } catch (NumberFormatException mfe) {
823                try {
824                    this.dataIn.reset();
825                } catch (IOException ioe) {
826                    throw JMSExceptionSupport.create(ioe);
827                }
828                throw mfe;
829    
830            } catch (EOFException e) {
831                JMSException jmsEx = new MessageEOFException(e.getMessage());
832                jmsEx.setLinkedException(e);
833                throw jmsEx;
834            } catch (IOException e) {
835                JMSException jmsEx = new MessageFormatException(e.getMessage());
836                jmsEx.setLinkedException(e);
837                throw jmsEx;
838            }
839        }
840    
841        /**
842         * Writes a <code>boolean</code> to the stream message. The value
843         * <code>true</code> is written as the value <code>(byte)1</code>; the
844         * value <code>false</code> is written as the value <code>(byte)0</code>.
845         *
846         * @param value the <code>boolean</code> value to be written
847         * @throws JMSException if the JMS provider fails to write the message due
848         *                 to some internal error.
849         * @throws MessageNotWriteableException if the message is in read-only mode.
850         */
851    
852        public void writeBoolean(boolean value) throws JMSException {
853            initializeWriting();
854            try {
855                MarshallingSupport.marshalBoolean(dataOut, value);
856            } catch (IOException ioe) {
857                throw JMSExceptionSupport.create(ioe);
858            }
859        }
860    
861        /**
862         * Writes a <code>byte</code> to the stream message.
863         *
864         * @param value the <code>byte</code> value to be written
865         * @throws JMSException if the JMS provider fails to write the message due
866         *                 to some internal error.
867         * @throws MessageNotWriteableException if the message is in read-only mode.
868         */
869    
870        public void writeByte(byte value) throws JMSException {
871            initializeWriting();
872            try {
873                MarshallingSupport.marshalByte(dataOut, value);
874            } catch (IOException ioe) {
875                throw JMSExceptionSupport.create(ioe);
876            }
877        }
878    
879        /**
880         * Writes a <code>short</code> to the stream message.
881         *
882         * @param value the <code>short</code> value to be written
883         * @throws JMSException if the JMS provider fails to write the message due
884         *                 to some internal error.
885         * @throws MessageNotWriteableException if the message is in read-only mode.
886         */
887    
888        public void writeShort(short value) throws JMSException {
889            initializeWriting();
890            try {
891                MarshallingSupport.marshalShort(dataOut, value);
892            } catch (IOException ioe) {
893                throw JMSExceptionSupport.create(ioe);
894            }
895        }
896    
897        /**
898         * Writes a <code>char</code> to the stream message.
899         *
900         * @param value the <code>char</code> value to be written
901         * @throws JMSException if the JMS provider fails to write the message due
902         *                 to some internal error.
903         * @throws MessageNotWriteableException if the message is in read-only mode.
904         */
905    
906        public void writeChar(char value) throws JMSException {
907            initializeWriting();
908            try {
909                MarshallingSupport.marshalChar(dataOut, value);
910            } catch (IOException ioe) {
911                throw JMSExceptionSupport.create(ioe);
912            }
913        }
914    
915        /**
916         * Writes an <code>int</code> to the stream message.
917         *
918         * @param value the <code>int</code> value to be written
919         * @throws JMSException if the JMS provider fails to write the message due
920         *                 to some internal error.
921         * @throws MessageNotWriteableException if the message is in read-only mode.
922         */
923    
924        public void writeInt(int value) throws JMSException {
925            initializeWriting();
926            try {
927                MarshallingSupport.marshalInt(dataOut, value);
928            } catch (IOException ioe) {
929                throw JMSExceptionSupport.create(ioe);
930            }
931        }
932    
933        /**
934         * Writes a <code>long</code> to the stream message.
935         *
936         * @param value the <code>long</code> value to be written
937         * @throws JMSException if the JMS provider fails to write the message due
938         *                 to some internal error.
939         * @throws MessageNotWriteableException if the message is in read-only mode.
940         */
941    
942        public void writeLong(long value) throws JMSException {
943            initializeWriting();
944            try {
945                MarshallingSupport.marshalLong(dataOut, value);
946            } catch (IOException ioe) {
947                throw JMSExceptionSupport.create(ioe);
948            }
949        }
950    
951        /**
952         * Writes a <code>float</code> to the stream message.
953         *
954         * @param value the <code>float</code> value to be written
955         * @throws JMSException if the JMS provider fails to write the message due
956         *                 to some internal error.
957         * @throws MessageNotWriteableException if the message is in read-only mode.
958         */
959    
960        public void writeFloat(float value) throws JMSException {
961            initializeWriting();
962            try {
963                MarshallingSupport.marshalFloat(dataOut, value);
964            } catch (IOException ioe) {
965                throw JMSExceptionSupport.create(ioe);
966            }
967        }
968    
969        /**
970         * Writes a <code>double</code> to the stream message.
971         *
972         * @param value the <code>double</code> value to be written
973         * @throws JMSException if the JMS provider fails to write the message due
974         *                 to some internal error.
975         * @throws MessageNotWriteableException if the message is in read-only mode.
976         */
977    
978        public void writeDouble(double value) throws JMSException {
979            initializeWriting();
980            try {
981                MarshallingSupport.marshalDouble(dataOut, value);
982            } catch (IOException ioe) {
983                throw JMSExceptionSupport.create(ioe);
984            }
985        }
986    
987        /**
988         * Writes a <code>String</code> to the stream message.
989         *
990         * @param value the <code>String</code> value to be written
991         * @throws JMSException if the JMS provider fails to write the message due
992         *                 to some internal error.
993         * @throws MessageNotWriteableException if the message is in read-only mode.
994         */
995    
996        public void writeString(String value) throws JMSException {
997            initializeWriting();
998            try {
999                if (value == null) {
1000                    MarshallingSupport.marshalNull(dataOut);
1001                } else {
1002                    MarshallingSupport.marshalString(dataOut, value);
1003                }
1004            } catch (IOException ioe) {
1005                throw JMSExceptionSupport.create(ioe);
1006            }
1007        }
1008    
1009        /**
1010         * Writes a byte array field to the stream message. <p/>
1011         * <P>
1012         * The byte array <code>value</code> is written to the message as a byte
1013         * array field. Consecutively written byte array fields are treated as two
1014         * distinct fields when the fields are read.
1015         *
1016         * @param value the byte array value to be written
1017         * @throws JMSException if the JMS provider fails to write the message due
1018         *                 to some internal error.
1019         * @throws MessageNotWriteableException if the message is in read-only mode.
1020         */
1021    
1022        public void writeBytes(byte[] value) throws JMSException {
1023            writeBytes(value, 0, value.length);
1024        }
1025    
1026        /**
1027         * Writes a portion of a byte array as a byte array field to the stream
1028         * message. <p/>
1029         * <P>
1030         * The a portion of the byte array <code>value</code> is written to the
1031         * message as a byte array field. Consecutively written byte array fields
1032         * are treated as two distinct fields when the fields are read.
1033         *
1034         * @param value the byte array value to be written
1035         * @param offset the initial offset within the byte array
1036         * @param length the number of bytes to use
1037         * @throws JMSException if the JMS provider fails to write the message due
1038         *                 to some internal error.
1039         * @throws MessageNotWriteableException if the message is in read-only mode.
1040         */
1041    
1042        public void writeBytes(byte[] value, int offset, int length) throws JMSException {
1043            initializeWriting();
1044            try {
1045                MarshallingSupport.marshalByteArray(dataOut, value, offset, length);
1046            } catch (IOException ioe) {
1047                throw JMSExceptionSupport.create(ioe);
1048            }
1049        }
1050    
1051        /**
1052         * Writes an object to the stream message. <p/>
1053         * <P>
1054         * This method works only for the objectified primitive object types (<code>Integer</code>,
1055         * <code>Double</code>, <code>Long</code>&nbsp;...),
1056         * <code>String</code> objects, and byte arrays.
1057         *
1058         * @param value the Java object to be written
1059         * @throws JMSException if the JMS provider fails to write the message due
1060         *                 to some internal error.
1061         * @throws MessageFormatException if the object is invalid.
1062         * @throws MessageNotWriteableException if the message is in read-only mode.
1063         */
1064    
1065        public void writeObject(Object value) throws JMSException {
1066            initializeWriting();
1067            if (value == null) {
1068                try {
1069                    MarshallingSupport.marshalNull(dataOut);
1070                } catch (IOException ioe) {
1071                    throw JMSExceptionSupport.create(ioe);
1072                }
1073            } else if (value instanceof String) {
1074                writeString(value.toString());
1075            } else if (value instanceof Character) {
1076                writeChar(((Character)value).charValue());
1077            } else if (value instanceof Boolean) {
1078                writeBoolean(((Boolean)value).booleanValue());
1079            } else if (value instanceof Byte) {
1080                writeByte(((Byte)value).byteValue());
1081            } else if (value instanceof Short) {
1082                writeShort(((Short)value).shortValue());
1083            } else if (value instanceof Integer) {
1084                writeInt(((Integer)value).intValue());
1085            } else if (value instanceof Float) {
1086                writeFloat(((Float)value).floatValue());
1087            } else if (value instanceof Double) {
1088                writeDouble(((Double)value).doubleValue());
1089            } else if (value instanceof byte[]) {
1090                writeBytes((byte[])value);
1091            }else if (value instanceof Long) {
1092                writeLong(((Long)value).longValue());
1093            }else {
1094                throw new MessageFormatException("Unsupported Object type: " + value.getClass());
1095            }
1096        }
1097    
1098        /**
1099         * Puts the message body in read-only mode and repositions the stream of
1100         * bytes to the beginning.
1101         *
1102         * @throws JMSException if an internal error occurs
1103         */
1104    
1105        public void reset() throws JMSException {
1106            storeContent();
1107            this.bytesOut = null;
1108            this.dataIn = null;
1109            this.dataOut = null;
1110            this.remainingBytes = -1;
1111            setReadOnlyBody(true);
1112        }
1113    
1114        private void initializeWriting() throws MessageNotWriteableException {
1115            checkReadOnlyBody();
1116            if (this.dataOut == null) {
1117                this.bytesOut = new ByteArrayOutputStream();
1118                OutputStream os = bytesOut;
1119                ActiveMQConnection connection = getConnection();
1120                if (connection != null && connection.isUseCompression()) {
1121                    compressed = true;
1122                    os = new DeflaterOutputStream(os);
1123                }
1124                this.dataOut = new DataOutputStream(os);
1125            }
1126        }
1127    
1128        protected void checkWriteOnlyBody() throws MessageNotReadableException {
1129            if (!readOnlyBody) {
1130                throw new MessageNotReadableException("Message body is write-only");
1131            }
1132        }
1133    
1134        private void initializeReading() throws MessageNotReadableException {
1135            checkWriteOnlyBody();
1136            if (this.dataIn == null) {
1137                ByteSequence data = getContent();
1138                if (data == null) {
1139                    data = new ByteSequence(new byte[] {}, 0, 0);
1140                }
1141                InputStream is = new ByteArrayInputStream(data);
1142                if (isCompressed()) {
1143                    is = new InflaterInputStream(is);
1144                    is = new BufferedInputStream(is);
1145                }
1146                this.dataIn = new DataInputStream(is);
1147            }
1148        }
1149    
1150        @Override
1151        public void compress() throws IOException {
1152            storeContent();
1153            super.compress();
1154        }
1155    
1156        public String toString() {
1157            return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
1158        }
1159    }