001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.command;
019
020import java.io.BufferedInputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.EOFException;
024import java.io.IOException;
025import java.io.InputStream;
026import java.io.OutputStream;
027import java.util.zip.DeflaterOutputStream;
028import java.util.zip.InflaterInputStream;
029
030import javax.jms.JMSException;
031import javax.jms.MessageEOFException;
032import javax.jms.MessageFormatException;
033import javax.jms.MessageNotReadableException;
034import javax.jms.MessageNotWriteableException;
035import javax.jms.StreamMessage;
036
037import org.apache.activemq.ActiveMQConnection;
038import org.apache.activemq.util.ByteArrayInputStream;
039import org.apache.activemq.util.ByteArrayOutputStream;
040import org.apache.activemq.util.ByteSequence;
041import org.apache.activemq.util.JMSExceptionSupport;
042import org.apache.activemq.util.MarshallingSupport;
043
044/**
045 * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive
046 * types in the Java programming language. It is filled and read sequentially.
047 * It inherits from the <CODE>Message</CODE> interface and adds a stream
048 * message body. Its methods are based largely on those found in
049 * <CODE>java.io.DataInputStream</CODE> and
050 * <CODE>java.io.DataOutputStream</CODE>. <p/>
051 * <P>
052 * The primitive types can be read or written explicitly using methods for each
053 * type. They may also be read or written generically as objects. For instance,
054 * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
055 * <CODE>StreamMessage.writeObject(new
056 * Integer(6))</CODE>. Both forms are
057 * provided, because the explicit form is convenient for static programming, and
058 * the object form is needed when types are not known at compile time. <p/>
059 * <P>
060 * When the message is first created, and when <CODE>clearBody</CODE> is
061 * called, the body of the message is in write-only mode. After the first call
062 * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
063 * After a message has been sent, the client that sent it can retain and modify
064 * it without affecting the message that has been sent. The same message object
065 * can be sent multiple times. When a message has been received, the provider
066 * has called <CODE>reset</CODE> so that the message body is in read-only mode
067 * for the client. <p/>
068 * <P>
069 * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
070 * message body is cleared and the message body is in write-only mode. <p/>
071 * <P>
072 * If a client attempts to read a message in write-only mode, a
073 * <CODE>MessageNotReadableException</CODE> is thrown. <p/>
074 * <P>
075 * If a client attempts to write a message in read-only mode, a
076 * <CODE>MessageNotWriteableException</CODE> is thrown. <p/>
077 * <P>
078 * <CODE>StreamMessage</CODE> objects support the following conversion table.
079 * The marked cases must be supported. The unmarked cases must throw a
080 * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive
081 * conversions may throw a runtime exception if the primitive's
082 * <CODE>valueOf()</CODE> method does not accept it as a valid
083 * <CODE>String</CODE> representation of the primitive. <p/>
084 * <P>
085 * A value written as the row type can be read as the column type. <p/>
086 *
087 * <PRE>
088 *  | | boolean byte short char int long float double String byte[]
089 * |----------------------------------------------------------------------
090 * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
091 * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
092 * X |----------------------------------------------------------------------
093 *
094 * </PRE>
095 *
096 * <p/>
097 * <P>
098 * Attempting to read a null value as a primitive type must be treated as
099 * calling the primitive's corresponding <code>valueOf(String)</code>
100 * conversion method with a null value. Since <code>char</code> does not
101 * support a <code>String</code> conversion, attempting to read a null value
102 * as a <code>char</code> must throw a <code>NullPointerException</code>.
103 *
104 * @openwire:marshaller code="27"
105 * @see javax.jms.Session#createStreamMessage()
106 * @see javax.jms.BytesMessage
107 * @see javax.jms.MapMessage
108 * @see javax.jms.Message
109 * @see javax.jms.ObjectMessage
110 * @see javax.jms.TextMessage
111 */
112public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage {
113
114    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE;
115
116    protected transient DataOutputStream dataOut;
117    protected transient ByteArrayOutputStream bytesOut;
118    protected transient DataInputStream dataIn;
119    protected transient int remainingBytes = -1;
120
121    @Override
122    public Message copy() {
123        ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
124        copy(copy);
125        return copy;
126    }
127
128    private void copy(ActiveMQStreamMessage copy) {
129        storeContent();
130        super.copy(copy);
131        copy.dataOut = null;
132        copy.bytesOut = null;
133        copy.dataIn = null;
134    }
135
136    @Override
137    public void onSend() throws JMSException {
138        super.onSend();
139        storeContent();
140    }
141
142    @Override
143    public void storeContent() {
144        if (dataOut != null) {
145            try {
146                dataOut.close();
147                setContent(bytesOut.toByteSequence());
148                bytesOut = null;
149                dataOut = null;
150            } catch (IOException ioe) {
151                throw new RuntimeException(ioe);
152            }
153        }
154    }
155
156    @Override
157    public byte getDataStructureType() {
158        return DATA_STRUCTURE_TYPE;
159    }
160
161    @Override
162    public String getJMSXMimeType() {
163        return "jms/stream-message";
164    }
165
166    /**
167     * Clears out the message body. Clearing a message's body does not clear its
168     * header values or property entries. <p/>
169     * <P>
170     * If this message body was read-only, calling this method leaves the
171     * message body in the same state as an empty body in a newly created
172     * message.
173     *
174     * @throws JMSException if the JMS provider fails to clear the message body
175     *                 due to some internal error.
176     */
177
178    @Override
179    public void clearBody() throws JMSException {
180        super.clearBody();
181        this.dataOut = null;
182        this.dataIn = null;
183        this.bytesOut = null;
184        this.remainingBytes = -1;
185    }
186
187    /**
188     * Reads a <code>boolean</code> from the stream message.
189     *
190     * @return the <code>boolean</code> value read
191     * @throws JMSException if the JMS provider fails to read the message due to
192     *                 some internal error.
193     * @throws MessageEOFException if unexpected end of message stream has been
194     *                 reached.
195     * @throws MessageFormatException if this type conversion is invalid.
196     * @throws MessageNotReadableException if the message is in write-only mode.
197     */
198
199    @Override
200    public boolean readBoolean() throws JMSException {
201        initializeReading();
202        try {
203
204            this.dataIn.mark(10);
205            int type = this.dataIn.read();
206            if (type == -1) {
207                throw new MessageEOFException("reached end of data");
208            }
209            if (type == MarshallingSupport.BOOLEAN_TYPE) {
210                return this.dataIn.readBoolean();
211            }
212            if (type == MarshallingSupport.STRING_TYPE) {
213                return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
214            }
215            if (type == MarshallingSupport.NULL) {
216                this.dataIn.reset();
217                throw new NullPointerException("Cannot convert NULL value to boolean.");
218            } else {
219                this.dataIn.reset();
220                throw new MessageFormatException(" not a boolean type");
221            }
222        } catch (EOFException e) {
223            throw JMSExceptionSupport.createMessageEOFException(e);
224        } catch (IOException e) {
225            throw JMSExceptionSupport.createMessageFormatException(e);
226        }
227    }
228
229    /**
230     * Reads a <code>byte</code> value from the stream message.
231     *
232     * @return the next byte from the stream message as a 8-bit
233     *         <code>byte</code>
234     * @throws JMSException if the JMS provider fails to read the message due to
235     *                 some internal error.
236     * @throws MessageEOFException if unexpected end of message stream has been
237     *                 reached.
238     * @throws MessageFormatException if this type conversion is invalid.
239     * @throws MessageNotReadableException if the message is in write-only mode.
240     */
241
242    @Override
243    public byte readByte() throws JMSException {
244        initializeReading();
245        try {
246
247            this.dataIn.mark(10);
248            int type = this.dataIn.read();
249            if (type == -1) {
250                throw new MessageEOFException("reached end of data");
251            }
252            if (type == MarshallingSupport.BYTE_TYPE) {
253                return this.dataIn.readByte();
254            }
255            if (type == MarshallingSupport.STRING_TYPE) {
256                return Byte.valueOf(this.dataIn.readUTF()).byteValue();
257            }
258            if (type == MarshallingSupport.NULL) {
259                this.dataIn.reset();
260                throw new NullPointerException("Cannot convert NULL value to byte.");
261            } else {
262                this.dataIn.reset();
263                throw new MessageFormatException(" not a byte type");
264            }
265        } catch (NumberFormatException mfe) {
266            try {
267                this.dataIn.reset();
268            } catch (IOException ioe) {
269                throw JMSExceptionSupport.create(ioe);
270            }
271            throw mfe;
272
273        } catch (EOFException e) {
274            throw JMSExceptionSupport.createMessageEOFException(e);
275        } catch (IOException e) {
276            throw JMSExceptionSupport.createMessageFormatException(e);
277        }
278    }
279
280    /**
281     * Reads a 16-bit integer from the stream message.
282     *
283     * @return a 16-bit integer from the stream message
284     * @throws JMSException if the JMS provider fails to read the message due to
285     *                 some internal error.
286     * @throws MessageEOFException if unexpected end of message stream has been
287     *                 reached.
288     * @throws MessageFormatException if this type conversion is invalid.
289     * @throws MessageNotReadableException if the message is in write-only mode.
290     */
291
292    @Override
293    public short readShort() throws JMSException {
294        initializeReading();
295        try {
296
297            this.dataIn.mark(17);
298            int type = this.dataIn.read();
299            if (type == -1) {
300                throw new MessageEOFException("reached end of data");
301            }
302            if (type == MarshallingSupport.SHORT_TYPE) {
303                return this.dataIn.readShort();
304            }
305            if (type == MarshallingSupport.BYTE_TYPE) {
306                return this.dataIn.readByte();
307            }
308            if (type == MarshallingSupport.STRING_TYPE) {
309                return Short.valueOf(this.dataIn.readUTF()).shortValue();
310            }
311            if (type == MarshallingSupport.NULL) {
312                this.dataIn.reset();
313                throw new NullPointerException("Cannot convert NULL value to short.");
314            } else {
315                this.dataIn.reset();
316                throw new MessageFormatException(" not a short type");
317            }
318        } catch (NumberFormatException mfe) {
319            try {
320                this.dataIn.reset();
321            } catch (IOException ioe) {
322                throw JMSExceptionSupport.create(ioe);
323            }
324            throw mfe;
325
326        } catch (EOFException e) {
327            throw JMSExceptionSupport.createMessageEOFException(e);
328        } catch (IOException e) {
329            throw JMSExceptionSupport.createMessageFormatException(e);
330        }
331
332    }
333
334    /**
335     * Reads a Unicode character value from the stream message.
336     *
337     * @return a Unicode character from the stream message
338     * @throws JMSException if the JMS provider fails to read the message due to
339     *                 some internal error.
340     * @throws MessageEOFException if unexpected end of message stream has been
341     *                 reached.
342     * @throws MessageFormatException if this type conversion is invalid
343     * @throws MessageNotReadableException if the message is in write-only mode.
344     */
345
346    @Override
347    public char readChar() throws JMSException {
348        initializeReading();
349        try {
350
351            this.dataIn.mark(17);
352            int type = this.dataIn.read();
353            if (type == -1) {
354                throw new MessageEOFException("reached end of data");
355            }
356            if (type == MarshallingSupport.CHAR_TYPE) {
357                return this.dataIn.readChar();
358            }
359            if (type == MarshallingSupport.NULL) {
360                this.dataIn.reset();
361                throw new NullPointerException("Cannot convert NULL value to char.");
362            } else {
363                this.dataIn.reset();
364                throw new MessageFormatException(" not a char type");
365            }
366        } catch (NumberFormatException mfe) {
367            try {
368                this.dataIn.reset();
369            } catch (IOException ioe) {
370                throw JMSExceptionSupport.create(ioe);
371            }
372            throw mfe;
373
374        } catch (EOFException e) {
375            throw JMSExceptionSupport.createMessageEOFException(e);
376        } catch (IOException e) {
377            throw JMSExceptionSupport.createMessageFormatException(e);
378        }
379    }
380
381    /**
382     * Reads a 32-bit integer from the stream message.
383     *
384     * @return a 32-bit integer value from the stream message, interpreted as an
385     *         <code>int</code>
386     * @throws JMSException if the JMS provider fails to read the message due to
387     *                 some internal error.
388     * @throws MessageEOFException if unexpected end of message stream has been
389     *                 reached.
390     * @throws MessageFormatException if this type conversion is invalid.
391     * @throws MessageNotReadableException if the message is in write-only mode.
392     */
393
394    @Override
395    public int readInt() throws JMSException {
396        initializeReading();
397        try {
398
399            this.dataIn.mark(33);
400            int type = this.dataIn.read();
401            if (type == -1) {
402                throw new MessageEOFException("reached end of data");
403            }
404            if (type == MarshallingSupport.INTEGER_TYPE) {
405                return this.dataIn.readInt();
406            }
407            if (type == MarshallingSupport.SHORT_TYPE) {
408                return this.dataIn.readShort();
409            }
410            if (type == MarshallingSupport.BYTE_TYPE) {
411                return this.dataIn.readByte();
412            }
413            if (type == MarshallingSupport.STRING_TYPE) {
414                return Integer.valueOf(this.dataIn.readUTF()).intValue();
415            }
416            if (type == MarshallingSupport.NULL) {
417                this.dataIn.reset();
418                throw new NullPointerException("Cannot convert NULL value to int.");
419            } else {
420                this.dataIn.reset();
421                throw new MessageFormatException(" not an int type");
422            }
423        } catch (NumberFormatException mfe) {
424            try {
425                this.dataIn.reset();
426            } catch (IOException ioe) {
427                throw JMSExceptionSupport.create(ioe);
428            }
429            throw mfe;
430
431        } catch (EOFException e) {
432            throw JMSExceptionSupport.createMessageEOFException(e);
433        } catch (IOException e) {
434            throw JMSExceptionSupport.createMessageFormatException(e);
435        }
436    }
437
438    /**
439     * Reads a 64-bit integer from the stream message.
440     *
441     * @return a 64-bit integer value from the stream message, interpreted as a
442     *         <code>long</code>
443     * @throws JMSException if the JMS provider fails to read the message due to
444     *                 some internal error.
445     * @throws MessageEOFException if unexpected end of message stream has been
446     *                 reached.
447     * @throws MessageFormatException if this type conversion is invalid.
448     * @throws MessageNotReadableException if the message is in write-only mode.
449     */
450
451    @Override
452    public long readLong() throws JMSException {
453        initializeReading();
454        try {
455
456            this.dataIn.mark(65);
457            int type = this.dataIn.read();
458            if (type == -1) {
459                throw new MessageEOFException("reached end of data");
460            }
461            if (type == MarshallingSupport.LONG_TYPE) {
462                return this.dataIn.readLong();
463            }
464            if (type == MarshallingSupport.INTEGER_TYPE) {
465                return this.dataIn.readInt();
466            }
467            if (type == MarshallingSupport.SHORT_TYPE) {
468                return this.dataIn.readShort();
469            }
470            if (type == MarshallingSupport.BYTE_TYPE) {
471                return this.dataIn.readByte();
472            }
473            if (type == MarshallingSupport.STRING_TYPE) {
474                return Long.valueOf(this.dataIn.readUTF()).longValue();
475            }
476            if (type == MarshallingSupport.NULL) {
477                this.dataIn.reset();
478                throw new NullPointerException("Cannot convert NULL value to long.");
479            } else {
480                this.dataIn.reset();
481                throw new MessageFormatException(" not a long type");
482            }
483        } catch (NumberFormatException mfe) {
484            try {
485                this.dataIn.reset();
486            } catch (IOException ioe) {
487                throw JMSExceptionSupport.create(ioe);
488            }
489            throw mfe;
490
491        } catch (EOFException e) {
492            throw JMSExceptionSupport.createMessageEOFException(e);
493        } catch (IOException e) {
494            throw JMSExceptionSupport.createMessageFormatException(e);
495        }
496    }
497
498    /**
499     * Reads a <code>float</code> from the stream message.
500     *
501     * @return a <code>float</code> value from the stream message
502     * @throws JMSException if the JMS provider fails to read the message due to
503     *                 some internal error.
504     * @throws MessageEOFException if unexpected end of message stream has been
505     *                 reached.
506     * @throws MessageFormatException if this type conversion is invalid.
507     * @throws MessageNotReadableException if the message is in write-only mode.
508     */
509
510    @Override
511    public float readFloat() throws JMSException {
512        initializeReading();
513        try {
514            this.dataIn.mark(33);
515            int type = this.dataIn.read();
516            if (type == -1) {
517                throw new MessageEOFException("reached end of data");
518            }
519            if (type == MarshallingSupport.FLOAT_TYPE) {
520                return this.dataIn.readFloat();
521            }
522            if (type == MarshallingSupport.STRING_TYPE) {
523                return Float.valueOf(this.dataIn.readUTF()).floatValue();
524            }
525            if (type == MarshallingSupport.NULL) {
526                this.dataIn.reset();
527                throw new NullPointerException("Cannot convert NULL value to float.");
528            } else {
529                this.dataIn.reset();
530                throw new MessageFormatException(" not a float type");
531            }
532        } catch (NumberFormatException mfe) {
533            try {
534                this.dataIn.reset();
535            } catch (IOException ioe) {
536                throw JMSExceptionSupport.create(ioe);
537            }
538            throw mfe;
539
540        } catch (EOFException e) {
541            throw JMSExceptionSupport.createMessageEOFException(e);
542        } catch (IOException e) {
543            throw JMSExceptionSupport.createMessageFormatException(e);
544        }
545    }
546
547    /**
548     * Reads a <code>double</code> from the stream message.
549     *
550     * @return a <code>double</code> value from the stream message
551     * @throws JMSException if the JMS provider fails to read the message due to
552     *                 some internal error.
553     * @throws MessageEOFException if unexpected end of message stream has been
554     *                 reached.
555     * @throws MessageFormatException if this type conversion is invalid.
556     * @throws MessageNotReadableException if the message is in write-only mode.
557     */
558
559    @Override
560    public double readDouble() throws JMSException {
561        initializeReading();
562        try {
563
564            this.dataIn.mark(65);
565            int type = this.dataIn.read();
566            if (type == -1) {
567                throw new MessageEOFException("reached end of data");
568            }
569            if (type == MarshallingSupport.DOUBLE_TYPE) {
570                return this.dataIn.readDouble();
571            }
572            if (type == MarshallingSupport.FLOAT_TYPE) {
573                return this.dataIn.readFloat();
574            }
575            if (type == MarshallingSupport.STRING_TYPE) {
576                return Double.valueOf(this.dataIn.readUTF()).doubleValue();
577            }
578            if (type == MarshallingSupport.NULL) {
579                this.dataIn.reset();
580                throw new NullPointerException("Cannot convert NULL value to double.");
581            } else {
582                this.dataIn.reset();
583                throw new MessageFormatException(" not a double type");
584            }
585        } catch (NumberFormatException mfe) {
586            try {
587                this.dataIn.reset();
588            } catch (IOException ioe) {
589                throw JMSExceptionSupport.create(ioe);
590            }
591            throw mfe;
592
593        } catch (EOFException e) {
594            throw JMSExceptionSupport.createMessageEOFException(e);
595        } catch (IOException e) {
596            throw JMSExceptionSupport.createMessageFormatException(e);
597        }
598    }
599
600    /**
601     * Reads a <CODE>String</CODE> from the stream message.
602     *
603     * @return a Unicode string from the stream message
604     * @throws JMSException if the JMS provider fails to read the message due to
605     *                 some internal error.
606     * @throws MessageEOFException if unexpected end of message stream has been
607     *                 reached.
608     * @throws MessageFormatException if this type conversion is invalid.
609     * @throws MessageNotReadableException if the message is in write-only mode.
610     */
611
612    @Override
613    public String readString() throws JMSException {
614        initializeReading();
615        try {
616
617            this.dataIn.mark(65);
618            int type = this.dataIn.read();
619            if (type == -1) {
620                throw new MessageEOFException("reached end of data");
621            }
622            if (type == MarshallingSupport.NULL) {
623                return null;
624            }
625            if (type == MarshallingSupport.BIG_STRING_TYPE) {
626                return MarshallingSupport.readUTF8(dataIn);
627            }
628            if (type == MarshallingSupport.STRING_TYPE) {
629                return this.dataIn.readUTF();
630            }
631            if (type == MarshallingSupport.LONG_TYPE) {
632                return new Long(this.dataIn.readLong()).toString();
633            }
634            if (type == MarshallingSupport.INTEGER_TYPE) {
635                return new Integer(this.dataIn.readInt()).toString();
636            }
637            if (type == MarshallingSupport.SHORT_TYPE) {
638                return new Short(this.dataIn.readShort()).toString();
639            }
640            if (type == MarshallingSupport.BYTE_TYPE) {
641                return new Byte(this.dataIn.readByte()).toString();
642            }
643            if (type == MarshallingSupport.FLOAT_TYPE) {
644                return new Float(this.dataIn.readFloat()).toString();
645            }
646            if (type == MarshallingSupport.DOUBLE_TYPE) {
647                return new Double(this.dataIn.readDouble()).toString();
648            }
649            if (type == MarshallingSupport.BOOLEAN_TYPE) {
650                return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
651            }
652            if (type == MarshallingSupport.CHAR_TYPE) {
653                return new Character(this.dataIn.readChar()).toString();
654            } else {
655                this.dataIn.reset();
656                throw new MessageFormatException(" not a String type");
657            }
658        } catch (NumberFormatException mfe) {
659            try {
660                this.dataIn.reset();
661            } catch (IOException ioe) {
662                throw JMSExceptionSupport.create(ioe);
663            }
664            throw mfe;
665
666        } catch (EOFException e) {
667            throw JMSExceptionSupport.createMessageEOFException(e);
668        } catch (IOException e) {
669            throw JMSExceptionSupport.createMessageFormatException(e);
670        }
671    }
672
673    /**
674     * Reads a byte array field from the stream message into the specified
675     * <CODE>byte[]</CODE> object (the read buffer). <p/>
676     * <P>
677     * To read the field value, <CODE>readBytes</CODE> should be successively
678     * called until it returns a value less than the length of the read buffer.
679     * The value of the bytes in the buffer following the last byte read is
680     * undefined. <p/>
681     * <P>
682     * If <CODE>readBytes</CODE> returns a value equal to the length of the
683     * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there
684     * are no more bytes to be read, this call returns -1. <p/>
685     * <P>
686     * If the byte array field value is null, <CODE>readBytes</CODE> returns
687     * -1. <p/>
688     * <P>
689     * If the byte array field value is empty, <CODE>readBytes</CODE> returns
690     * 0. <p/>
691     * <P>
692     * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE>
693     * field value has been made, the full value of the field must be read
694     * before it is valid to read the next field. An attempt to read the next
695     * field before that has been done will throw a
696     * <CODE>MessageFormatException</CODE>. <p/>
697     * <P>
698     * To read the byte field value into a new <CODE>byte[]</CODE> object, use
699     * the <CODE>readObject</CODE> method.
700     *
701     * @param value the buffer into which the data is read
702     * @return the total number of bytes read into the buffer, or -1 if there is
703     *         no more data because the end of the byte field has been reached
704     * @throws JMSException if the JMS provider fails to read the message due to
705     *                 some internal error.
706     * @throws MessageEOFException if unexpected end of message stream has been
707     *                 reached.
708     * @throws MessageFormatException if this type conversion is invalid.
709     * @throws MessageNotReadableException if the message is in write-only mode.
710     * @see #readObject()
711     */
712
713    @Override
714    public int readBytes(byte[] value) throws JMSException {
715
716        initializeReading();
717        try {
718            if (value == null) {
719                throw new NullPointerException();
720            }
721
722            if (remainingBytes == -1) {
723                this.dataIn.mark(value.length + 1);
724                int type = this.dataIn.read();
725                if (type == -1) {
726                    throw new MessageEOFException("reached end of data");
727                }
728                if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
729                    throw new MessageFormatException("Not a byte array");
730                }
731                remainingBytes = this.dataIn.readInt();
732            } else if (remainingBytes == 0) {
733                remainingBytes = -1;
734                return -1;
735            }
736
737            if (value.length <= remainingBytes) {
738                // small buffer
739                remainingBytes -= value.length;
740                this.dataIn.readFully(value);
741                return value.length;
742            } else {
743                // big buffer
744                int rc = this.dataIn.read(value, 0, remainingBytes);
745                remainingBytes = 0;
746                return rc;
747            }
748
749        } catch (EOFException e) {
750            JMSException jmsEx = new MessageEOFException(e.getMessage());
751            jmsEx.setLinkedException(e);
752            throw jmsEx;
753        } catch (IOException e) {
754            JMSException jmsEx = new MessageFormatException(e.getMessage());
755            jmsEx.setLinkedException(e);
756            throw jmsEx;
757        }
758    }
759
760    /**
761     * Reads an object from the stream message. <p/>
762     * <P>
763     * This method can be used to return, in objectified format, an object in
764     * the Java programming language ("Java object") that has been written to
765     * the stream with the equivalent <CODE>writeObject</CODE> method call, or
766     * its equivalent primitive <CODE>write<I>type</I></CODE> method. <p/>
767     * <P>
768     * Note that byte values are returned as <CODE>byte[]</CODE>, not
769     * <CODE>Byte[]</CODE>. <p/>
770     * <P>
771     * An attempt to call <CODE>readObject</CODE> to read a byte field value
772     * into a new <CODE>byte[]</CODE> object before the full value of the byte
773     * field has been read will throw a <CODE>MessageFormatException</CODE>.
774     *
775     * @return a Java object from the stream message, in objectified format (for
776     *         example, if the object was written as an <CODE>int</CODE>, an
777     *         <CODE>Integer</CODE> is returned)
778     * @throws JMSException if the JMS provider fails to read the message due to
779     *                 some internal error.
780     * @throws MessageEOFException if unexpected end of message stream has been
781     *                 reached.
782     * @throws MessageFormatException if this type conversion is invalid.
783     * @throws MessageNotReadableException if the message is in write-only mode.
784     * @see #readBytes(byte[] value)
785     */
786
787    @Override
788    public Object readObject() throws JMSException {
789        initializeReading();
790        try {
791            this.dataIn.mark(65);
792            int type = this.dataIn.read();
793            if (type == -1) {
794                throw new MessageEOFException("reached end of data");
795            }
796            if (type == MarshallingSupport.NULL) {
797                return null;
798            }
799            if (type == MarshallingSupport.BIG_STRING_TYPE) {
800                return MarshallingSupport.readUTF8(dataIn);
801            }
802            if (type == MarshallingSupport.STRING_TYPE) {
803                return this.dataIn.readUTF();
804            }
805            if (type == MarshallingSupport.LONG_TYPE) {
806                return Long.valueOf(this.dataIn.readLong());
807            }
808            if (type == MarshallingSupport.INTEGER_TYPE) {
809                return Integer.valueOf(this.dataIn.readInt());
810            }
811            if (type == MarshallingSupport.SHORT_TYPE) {
812                return Short.valueOf(this.dataIn.readShort());
813            }
814            if (type == MarshallingSupport.BYTE_TYPE) {
815                return Byte.valueOf(this.dataIn.readByte());
816            }
817            if (type == MarshallingSupport.FLOAT_TYPE) {
818                return new Float(this.dataIn.readFloat());
819            }
820            if (type == MarshallingSupport.DOUBLE_TYPE) {
821                return new Double(this.dataIn.readDouble());
822            }
823            if (type == MarshallingSupport.BOOLEAN_TYPE) {
824                return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
825            }
826            if (type == MarshallingSupport.CHAR_TYPE) {
827                return Character.valueOf(this.dataIn.readChar());
828            }
829            if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
830                int len = this.dataIn.readInt();
831                byte[] value = new byte[len];
832                this.dataIn.readFully(value);
833                return value;
834            } else {
835                this.dataIn.reset();
836                throw new MessageFormatException("unknown type");
837            }
838        } catch (NumberFormatException mfe) {
839            try {
840                this.dataIn.reset();
841            } catch (IOException ioe) {
842                throw JMSExceptionSupport.create(ioe);
843            }
844            throw mfe;
845
846        } catch (EOFException e) {
847            JMSException jmsEx = new MessageEOFException(e.getMessage());
848            jmsEx.setLinkedException(e);
849            throw jmsEx;
850        } catch (IOException e) {
851            JMSException jmsEx = new MessageFormatException(e.getMessage());
852            jmsEx.setLinkedException(e);
853            throw jmsEx;
854        }
855    }
856
857    /**
858     * Writes a <code>boolean</code> to the stream message. The value
859     * <code>true</code> is written as the value <code>(byte)1</code>; the
860     * value <code>false</code> is written as the value <code>(byte)0</code>.
861     *
862     * @param value the <code>boolean</code> value to be written
863     * @throws JMSException if the JMS provider fails to write the message due
864     *                 to some internal error.
865     * @throws MessageNotWriteableException if the message is in read-only mode.
866     */
867
868    @Override
869    public void writeBoolean(boolean value) throws JMSException {
870        initializeWriting();
871        try {
872            MarshallingSupport.marshalBoolean(dataOut, value);
873        } catch (IOException ioe) {
874            throw JMSExceptionSupport.create(ioe);
875        }
876    }
877
878    /**
879     * Writes a <code>byte</code> to the stream message.
880     *
881     * @param value the <code>byte</code> value to be written
882     * @throws JMSException if the JMS provider fails to write the message due
883     *                 to some internal error.
884     * @throws MessageNotWriteableException if the message is in read-only mode.
885     */
886
887    @Override
888    public void writeByte(byte value) throws JMSException {
889        initializeWriting();
890        try {
891            MarshallingSupport.marshalByte(dataOut, value);
892        } catch (IOException ioe) {
893            throw JMSExceptionSupport.create(ioe);
894        }
895    }
896
897    /**
898     * Writes a <code>short</code> to the stream message.
899     *
900     * @param value the <code>short</code> value to be written
901     * @throws JMSException if the JMS provider fails to write the message due
902     *                 to some internal error.
903     * @throws MessageNotWriteableException if the message is in read-only mode.
904     */
905
906    @Override
907    public void writeShort(short value) throws JMSException {
908        initializeWriting();
909        try {
910            MarshallingSupport.marshalShort(dataOut, value);
911        } catch (IOException ioe) {
912            throw JMSExceptionSupport.create(ioe);
913        }
914    }
915
916    /**
917     * Writes a <code>char</code> to the stream message.
918     *
919     * @param value the <code>char</code> value to be written
920     * @throws JMSException if the JMS provider fails to write the message due
921     *                 to some internal error.
922     * @throws MessageNotWriteableException if the message is in read-only mode.
923     */
924
925    @Override
926    public void writeChar(char value) throws JMSException {
927        initializeWriting();
928        try {
929            MarshallingSupport.marshalChar(dataOut, value);
930        } catch (IOException ioe) {
931            throw JMSExceptionSupport.create(ioe);
932        }
933    }
934
935    /**
936     * Writes an <code>int</code> to the stream message.
937     *
938     * @param value the <code>int</code> value to be written
939     * @throws JMSException if the JMS provider fails to write the message due
940     *                 to some internal error.
941     * @throws MessageNotWriteableException if the message is in read-only mode.
942     */
943
944    @Override
945    public void writeInt(int value) throws JMSException {
946        initializeWriting();
947        try {
948            MarshallingSupport.marshalInt(dataOut, value);
949        } catch (IOException ioe) {
950            throw JMSExceptionSupport.create(ioe);
951        }
952    }
953
954    /**
955     * Writes a <code>long</code> to the stream message.
956     *
957     * @param value the <code>long</code> value to be written
958     * @throws JMSException if the JMS provider fails to write the message due
959     *                 to some internal error.
960     * @throws MessageNotWriteableException if the message is in read-only mode.
961     */
962
963    @Override
964    public void writeLong(long value) throws JMSException {
965        initializeWriting();
966        try {
967            MarshallingSupport.marshalLong(dataOut, value);
968        } catch (IOException ioe) {
969            throw JMSExceptionSupport.create(ioe);
970        }
971    }
972
973    /**
974     * Writes a <code>float</code> to the stream message.
975     *
976     * @param value the <code>float</code> value to be written
977     * @throws JMSException if the JMS provider fails to write the message due
978     *                 to some internal error.
979     * @throws MessageNotWriteableException if the message is in read-only mode.
980     */
981
982    @Override
983    public void writeFloat(float value) throws JMSException {
984        initializeWriting();
985        try {
986            MarshallingSupport.marshalFloat(dataOut, value);
987        } catch (IOException ioe) {
988            throw JMSExceptionSupport.create(ioe);
989        }
990    }
991
992    /**
993     * Writes a <code>double</code> to the stream message.
994     *
995     * @param value the <code>double</code> value to be written
996     * @throws JMSException if the JMS provider fails to write the message due
997     *                 to some internal error.
998     * @throws MessageNotWriteableException if the message is in read-only mode.
999     */
1000
1001    @Override
1002    public void writeDouble(double value) throws JMSException {
1003        initializeWriting();
1004        try {
1005            MarshallingSupport.marshalDouble(dataOut, value);
1006        } catch (IOException ioe) {
1007            throw JMSExceptionSupport.create(ioe);
1008        }
1009    }
1010
1011    /**
1012     * Writes a <code>String</code> to the stream message.
1013     *
1014     * @param value the <code>String</code> value to be written
1015     * @throws JMSException if the JMS provider fails to write the message due
1016     *                 to some internal error.
1017     * @throws MessageNotWriteableException if the message is in read-only mode.
1018     */
1019
1020    @Override
1021    public void writeString(String value) throws JMSException {
1022        initializeWriting();
1023        try {
1024            if (value == null) {
1025                MarshallingSupport.marshalNull(dataOut);
1026            } else {
1027                MarshallingSupport.marshalString(dataOut, value);
1028            }
1029        } catch (IOException ioe) {
1030            throw JMSExceptionSupport.create(ioe);
1031        }
1032    }
1033
1034    /**
1035     * Writes a byte array field to the stream message. <p/>
1036     * <P>
1037     * The byte array <code>value</code> is written to the message as a byte
1038     * array field. Consecutively written byte array fields are treated as two
1039     * distinct fields when the fields are read.
1040     *
1041     * @param value the byte array value to be written
1042     * @throws JMSException if the JMS provider fails to write the message due
1043     *                 to some internal error.
1044     * @throws MessageNotWriteableException if the message is in read-only mode.
1045     */
1046
1047    @Override
1048    public void writeBytes(byte[] value) throws JMSException {
1049        writeBytes(value, 0, value.length);
1050    }
1051
1052    /**
1053     * Writes a portion of a byte array as a byte array field to the stream
1054     * message. <p/>
1055     * <P>
1056     * The a portion of the byte array <code>value</code> is written to the
1057     * message as a byte array field. Consecutively written byte array fields
1058     * are treated as two distinct fields when the fields are read.
1059     *
1060     * @param value the byte array value to be written
1061     * @param offset the initial offset within the byte array
1062     * @param length the number of bytes to use
1063     * @throws JMSException if the JMS provider fails to write the message due
1064     *                 to some internal error.
1065     * @throws MessageNotWriteableException if the message is in read-only mode.
1066     */
1067
1068    @Override
1069    public void writeBytes(byte[] value, int offset, int length) throws JMSException {
1070        initializeWriting();
1071        try {
1072            MarshallingSupport.marshalByteArray(dataOut, value, offset, length);
1073        } catch (IOException ioe) {
1074            throw JMSExceptionSupport.create(ioe);
1075        }
1076    }
1077
1078    /**
1079     * Writes an object to the stream message. <p/>
1080     * <P>
1081     * This method works only for the objectified primitive object types (<code>Integer</code>,
1082     * <code>Double</code>, <code>Long</code>&nbsp;...),
1083     * <code>String</code> objects, and byte arrays.
1084     *
1085     * @param value the Java object to be written
1086     * @throws JMSException if the JMS provider fails to write the message due
1087     *                 to some internal error.
1088     * @throws MessageFormatException if the object is invalid.
1089     * @throws MessageNotWriteableException if the message is in read-only mode.
1090     */
1091
1092    @Override
1093    public void writeObject(Object value) throws JMSException {
1094        initializeWriting();
1095        if (value == null) {
1096            try {
1097                MarshallingSupport.marshalNull(dataOut);
1098            } catch (IOException ioe) {
1099                throw JMSExceptionSupport.create(ioe);
1100            }
1101        } else if (value instanceof String) {
1102            writeString(value.toString());
1103        } else if (value instanceof Character) {
1104            writeChar(((Character)value).charValue());
1105        } else if (value instanceof Boolean) {
1106            writeBoolean(((Boolean)value).booleanValue());
1107        } else if (value instanceof Byte) {
1108            writeByte(((Byte)value).byteValue());
1109        } else if (value instanceof Short) {
1110            writeShort(((Short)value).shortValue());
1111        } else if (value instanceof Integer) {
1112            writeInt(((Integer)value).intValue());
1113        } else if (value instanceof Float) {
1114            writeFloat(((Float)value).floatValue());
1115        } else if (value instanceof Double) {
1116            writeDouble(((Double)value).doubleValue());
1117        } else if (value instanceof byte[]) {
1118            writeBytes((byte[])value);
1119        }else if (value instanceof Long) {
1120            writeLong(((Long)value).longValue());
1121        }else {
1122            throw new MessageFormatException("Unsupported Object type: " + value.getClass());
1123        }
1124    }
1125
1126    /**
1127     * Puts the message body in read-only mode and repositions the stream of
1128     * bytes to the beginning.
1129     *
1130     * @throws JMSException if an internal error occurs
1131     */
1132
1133    @Override
1134    public void reset() throws JMSException {
1135        storeContent();
1136        this.bytesOut = null;
1137        this.dataIn = null;
1138        this.dataOut = null;
1139        this.remainingBytes = -1;
1140        setReadOnlyBody(true);
1141    }
1142
1143    private void initializeWriting() throws JMSException {
1144        checkReadOnlyBody();
1145        if (this.dataOut == null) {
1146            this.bytesOut = new ByteArrayOutputStream();
1147            OutputStream os = bytesOut;
1148            ActiveMQConnection connection = getConnection();
1149            if (connection != null && connection.isUseCompression()) {
1150                compressed = true;
1151                os = new DeflaterOutputStream(os);
1152            }
1153            this.dataOut = new DataOutputStream(os);
1154        }
1155
1156        // For a message that already had a body and was sent we need to restore the content
1157        // if the message is used again without having its clearBody method called.
1158        if (this.content != null && this.content.length > 0) {
1159            try {
1160                if (compressed) {
1161                    ByteArrayInputStream input = new ByteArrayInputStream(this.content.getData(), this.content.getOffset(), this.content.getLength());
1162                    InflaterInputStream inflater = new InflaterInputStream(input);
1163                    try {
1164                        byte[] buffer = new byte[8*1024];
1165                        int read = 0;
1166                        while ((read = inflater.read(buffer)) != -1) {
1167                            this.dataOut.write(buffer, 0, read);
1168                        }
1169                    } finally {
1170                        inflater.close();
1171                    }
1172                } else {
1173                    this.dataOut.write(this.content.getData(), this.content.getOffset(), this.content.getLength());
1174                }
1175                // Free up the buffer from the old content, will be re-written when
1176                // tbe message is sent again and storeContent() is called.
1177                this.content = null;
1178            } catch (IOException ioe) {
1179                throw JMSExceptionSupport.create(ioe);
1180            }
1181        }
1182    }
1183
1184    protected void checkWriteOnlyBody() throws MessageNotReadableException {
1185        if (!readOnlyBody) {
1186            throw new MessageNotReadableException("Message body is write-only");
1187        }
1188    }
1189
1190    private void initializeReading() throws MessageNotReadableException {
1191        checkWriteOnlyBody();
1192        if (this.dataIn == null) {
1193            ByteSequence data = getContent();
1194            if (data == null) {
1195                data = new ByteSequence(new byte[] {}, 0, 0);
1196            }
1197            InputStream is = new ByteArrayInputStream(data);
1198            if (isCompressed()) {
1199                is = new InflaterInputStream(is);
1200                is = new BufferedInputStream(is);
1201            }
1202            this.dataIn = new DataInputStream(is);
1203        }
1204    }
1205
1206    @Override
1207    public void compress() throws IOException {
1208        storeContent();
1209        super.compress();
1210    }
1211
1212    @Override
1213    public String toString() {
1214        return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
1215    }
1216}