001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.EOFException;
022    import java.io.FilterOutputStream;
023    import java.io.IOException;
024    import java.io.InputStream;
025    import java.io.OutputStream;
026    import java.util.zip.Deflater;
027    import java.util.zip.DeflaterOutputStream;
028    import java.util.zip.InflaterInputStream;
029    
030    import javax.jms.BytesMessage;
031    import javax.jms.JMSException;
032    import javax.jms.MessageEOFException;
033    import javax.jms.MessageFormatException;
034    import javax.jms.MessageNotReadableException;
035    import javax.jms.MessageNotWriteableException;
036    
037    import org.apache.activemq.ActiveMQConnection;
038    import org.apache.activemq.util.ByteArrayInputStream;
039    import org.apache.activemq.util.ByteArrayOutputStream;
040    import org.apache.activemq.util.ByteSequence;
041    import org.apache.activemq.util.ByteSequenceData;
042    import org.apache.activemq.util.JMSExceptionSupport;
043    
044    /**
045     * A <CODE>BytesMessage</CODE> object is used to send a message containing a
046     * stream of uninterpreted bytes. It inherits from the <CODE>Message</CODE>
047     * interface and adds a bytes message body. The receiver of the message supplies
048     * the interpretation of the bytes.
049     * <P>
050     * The <CODE>BytesMessage</CODE> methods are based largely on those found in
051     * <CODE>java.io.DataInputStream</CODE> and
052     * <CODE>java.io.DataOutputStream</CODE>.
053     * <P>
054     * This message type is for client encoding of existing message formats. If
055     * possible, one of the other self-defining message types should be used
056     * instead.
057     * <P>
058     * Although the JMS API allows the use of message properties with byte messages,
059     * they are typically not used, since the inclusion of properties may affect the
060     * format.
061     * <P>
062     * The primitive types can be written explicitly using methods for each type.
063     * They may also be written generically as objects. For instance, a call to
064     * <CODE>BytesMessage.writeInt(6)</CODE> is equivalent to
065     * <CODE> BytesMessage.writeObject(new Integer(6))</CODE>. Both forms are
066     * provided, because the explicit form is convenient for static programming, and
067     * the object form is needed when types are not known at compile time.
068     * <P>
069     * When the message is first created, and when <CODE>clearBody</CODE> is
070     * called, the body of the message is in write-only mode. After the first call
071     * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
072     * After a message has been sent, the client that sent it can retain and modify
073     * it without affecting the message that has been sent. The same message object
074     * can be sent multiple times. When a message has been received, the provider
075     * has called <CODE>reset</CODE> so that the message body is in read-only mode
076     * for the client.
077     * <P>
078     * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
079     * message body is cleared and the message is in write-only mode.
080     * <P>
081     * If a client attempts to read a message in write-only mode, a
082     * <CODE>MessageNotReadableException</CODE> is thrown.
083     * <P>
084     * If a client attempts to write a message in read-only mode, a
085     * <CODE>MessageNotWriteableException</CODE> is thrown.
086     *
087     * @openwire:marshaller code=24
088     * @see javax.jms.Session#createBytesMessage()
089     * @see javax.jms.MapMessage
090     * @see javax.jms.Message
091     * @see javax.jms.ObjectMessage
092     * @see javax.jms.StreamMessage
093     * @see javax.jms.TextMessage
094     */
095    public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessage {
096    
097        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BYTES_MESSAGE;
098    
099        protected transient DataOutputStream dataOut;
100        protected transient ByteArrayOutputStream bytesOut;
101        protected transient DataInputStream dataIn;
102        protected transient int length;
103    
104        @Override
105        public Message copy() {
106            ActiveMQBytesMessage copy = new ActiveMQBytesMessage();
107            copy(copy);
108            return copy;
109        }
110    
111        private void copy(ActiveMQBytesMessage copy) {
112            storeContent();
113            super.copy(copy);
114            copy.dataOut = null;
115            copy.bytesOut = null;
116            copy.dataIn = null;
117        }
118    
119        @Override
120        public void onSend() throws JMSException {
121            super.onSend();
122            storeContent();
123        }
124    
125        @Override
126        public void storeContent() {
127            try {
128                if (dataOut != null) {
129                    dataOut.close();
130                    ByteSequence bs = bytesOut.toByteSequence();
131                    if (compressed) {
132                        int pos = bs.offset;
133                        ByteSequenceData.writeIntBig(bs, length);
134                        bs.offset = pos;
135                    }
136                    setContent(bs);
137                    bytesOut = null;
138                    dataOut = null;
139                }
140            } catch (IOException ioe) {
141                throw new RuntimeException(ioe.getMessage(), ioe); // TODO verify
142                                                                    // RuntimeException
143            }
144        }
145    
146        @Override
147        public byte getDataStructureType() {
148            return DATA_STRUCTURE_TYPE;
149        }
150    
151        @Override
152        public String getJMSXMimeType() {
153            return "jms/bytes-message";
154        }
155    
156        /**
157         * Clears out the message body. Clearing a message's body does not clear its
158         * header values or property entries.
159         * <P>
160         * If this message body was read-only, calling this method leaves the
161         * message body in the same state as an empty body in a newly created
162         * message.
163         *
164         * @throws JMSException if the JMS provider fails to clear the message body
165         *                 due to some internal error.
166         */
167        @Override
168        public void clearBody() throws JMSException {
169            super.clearBody();
170            this.dataOut = null;
171            this.dataIn = null;
172            this.bytesOut = null;
173        }
174    
175        /**
176         * Gets the number of bytes of the message body when the message is in
177         * read-only mode. The value returned can be used to allocate a byte array.
178         * The value returned is the entire length of the message body, regardless
179         * of where the pointer for reading the message is currently located.
180         *
181         * @return number of bytes in the message
182         * @throws JMSException if the JMS provider fails to read the message due to
183         *                 some internal error.
184         * @throws MessageNotReadableException if the message is in write-only mode.
185         * @since 1.1
186         */
187    
188        @Override
189        public long getBodyLength() throws JMSException {
190            initializeReading();
191            return length;
192        }
193    
194        /**
195         * Reads a <code>boolean</code> from the bytes message stream.
196         *
197         * @return the <code>boolean</code> value read
198         * @throws JMSException if the JMS provider fails to read the message due to
199         *                 some internal error.
200         * @throws MessageEOFException if unexpected end of bytes stream has been
201         *                 reached.
202         * @throws MessageNotReadableException if the message is in write-only mode.
203         */
204        @Override
205        public boolean readBoolean() throws JMSException {
206            initializeReading();
207            try {
208                return this.dataIn.readBoolean();
209            } catch (EOFException e) {
210                throw JMSExceptionSupport.createMessageEOFException(e);
211            } catch (IOException e) {
212                throw JMSExceptionSupport.createMessageFormatException(e);
213            }
214        }
215    
216        /**
217         * Reads a signed 8-bit value from the bytes message stream.
218         *
219         * @return the next byte from the bytes message stream as a signed 8-bit
220         *         <code>byte</code>
221         * @throws JMSException if the JMS provider fails to read the message due to
222         *                 some internal error.
223         * @throws MessageEOFException if unexpected end of bytes stream has been
224         *                 reached.
225         * @throws MessageNotReadableException if the message is in write-only mode.
226         */
227        @Override
228        public byte readByte() throws JMSException {
229            initializeReading();
230            try {
231                return this.dataIn.readByte();
232            } catch (EOFException e) {
233                throw JMSExceptionSupport.createMessageEOFException(e);
234            } catch (IOException e) {
235                throw JMSExceptionSupport.createMessageFormatException(e);
236            }
237        }
238    
239        /**
240         * Reads an unsigned 8-bit number from the bytes message stream.
241         *
242         * @return the next byte from the bytes message stream, interpreted as an
243         *         unsigned 8-bit number
244         * @throws JMSException if the JMS provider fails to read the message due to
245         *                 some internal error.
246         * @throws MessageEOFException if unexpected end of bytes stream has been
247         *                 reached.
248         * @throws MessageNotReadableException if the message is in write-only mode.
249         */
250        @Override
251        public int readUnsignedByte() throws JMSException {
252            initializeReading();
253            try {
254                return this.dataIn.readUnsignedByte();
255            } catch (EOFException e) {
256                throw JMSExceptionSupport.createMessageEOFException(e);
257            } catch (IOException e) {
258                throw JMSExceptionSupport.createMessageFormatException(e);
259            }
260        }
261    
262        /**
263         * Reads a signed 16-bit number from the bytes message stream.
264         *
265         * @return the next two bytes from the bytes message stream, interpreted as
266         *         a signed 16-bit number
267         * @throws JMSException if the JMS provider fails to read the message due to
268         *                 some internal error.
269         * @throws MessageEOFException if unexpected end of bytes stream has been
270         *                 reached.
271         * @throws MessageNotReadableException if the message is in write-only mode.
272         */
273        @Override
274        public short readShort() throws JMSException {
275            initializeReading();
276            try {
277                return this.dataIn.readShort();
278            } catch (EOFException e) {
279                throw JMSExceptionSupport.createMessageEOFException(e);
280            } catch (IOException e) {
281                throw JMSExceptionSupport.createMessageFormatException(e);
282            }
283        }
284    
285        /**
286         * Reads an unsigned 16-bit number from the bytes message stream.
287         *
288         * @return the next two bytes from the bytes message stream, interpreted as
289         *         an unsigned 16-bit integer
290         * @throws JMSException if the JMS provider fails to read the message due to
291         *                 some internal error.
292         * @throws MessageEOFException if unexpected end of bytes stream has been
293         *                 reached.
294         * @throws MessageNotReadableException if the message is in write-only mode.
295         */
296        @Override
297        public int readUnsignedShort() throws JMSException {
298            initializeReading();
299            try {
300                return this.dataIn.readUnsignedShort();
301            } catch (EOFException e) {
302                throw JMSExceptionSupport.createMessageEOFException(e);
303            } catch (IOException e) {
304                throw JMSExceptionSupport.createMessageFormatException(e);
305            }
306        }
307    
308        /**
309         * Reads a Unicode character value from the bytes message stream.
310         *
311         * @return the next two bytes from the bytes message stream as a Unicode
312         *         character
313         * @throws JMSException if the JMS provider fails to read the message due to
314         *                 some internal error.
315         * @throws MessageEOFException if unexpected end of bytes stream has been
316         *                 reached.
317         * @throws MessageNotReadableException if the message is in write-only mode.
318         */
319        @Override
320        public char readChar() throws JMSException {
321            initializeReading();
322            try {
323                return this.dataIn.readChar();
324            } catch (EOFException e) {
325                throw JMSExceptionSupport.createMessageEOFException(e);
326            } catch (IOException e) {
327                throw JMSExceptionSupport.createMessageFormatException(e);
328            }
329        }
330    
331        /**
332         * Reads a signed 32-bit integer from the bytes message stream.
333         *
334         * @return the next four bytes from the bytes message stream, interpreted as
335         *         an <code>int</code>
336         * @throws JMSException if the JMS provider fails to read the message due to
337         *                 some internal error.
338         * @throws MessageEOFException if unexpected end of bytes stream has been
339         *                 reached.
340         * @throws MessageNotReadableException if the message is in write-only mode.
341         */
342        @Override
343        public int readInt() throws JMSException {
344            initializeReading();
345            try {
346                return this.dataIn.readInt();
347            } catch (EOFException e) {
348                throw JMSExceptionSupport.createMessageEOFException(e);
349            } catch (IOException e) {
350                throw JMSExceptionSupport.createMessageFormatException(e);
351            }
352        }
353    
354        /**
355         * Reads a signed 64-bit integer from the bytes message stream.
356         *
357         * @return the next eight bytes from the bytes message stream, interpreted
358         *         as a <code>long</code>
359         * @throws JMSException if the JMS provider fails to read the message due to
360         *                 some internal error.
361         * @throws MessageEOFException if unexpected end of bytes stream has been
362         *                 reached.
363         * @throws MessageNotReadableException if the message is in write-only mode.
364         */
365        @Override
366        public long readLong() throws JMSException {
367            initializeReading();
368            try {
369                return this.dataIn.readLong();
370            } catch (EOFException e) {
371                throw JMSExceptionSupport.createMessageEOFException(e);
372            } catch (IOException e) {
373                throw JMSExceptionSupport.createMessageFormatException(e);
374            }
375        }
376    
377        /**
378         * Reads a <code>float</code> from the bytes message stream.
379         *
380         * @return the next four bytes from the bytes message stream, interpreted as
381         *         a <code>float</code>
382         * @throws JMSException if the JMS provider fails to read the message due to
383         *                 some internal error.
384         * @throws MessageEOFException if unexpected end of bytes stream has been
385         *                 reached.
386         * @throws MessageNotReadableException if the message is in write-only mode.
387         */
388        @Override
389        public float readFloat() throws JMSException {
390            initializeReading();
391            try {
392                return this.dataIn.readFloat();
393            } catch (EOFException e) {
394                throw JMSExceptionSupport.createMessageEOFException(e);
395            } catch (IOException e) {
396                throw JMSExceptionSupport.createMessageFormatException(e);
397            }
398        }
399    
400        /**
401         * Reads a <code>double</code> from the bytes message stream.
402         *
403         * @return the next eight bytes from the bytes message stream, interpreted
404         *         as a <code>double</code>
405         * @throws JMSException if the JMS provider fails to read the message due to
406         *                 some internal error.
407         * @throws MessageEOFException if unexpected end of bytes stream has been
408         *                 reached.
409         * @throws MessageNotReadableException if the message is in write-only mode.
410         */
411        @Override
412        public double readDouble() throws JMSException {
413            initializeReading();
414            try {
415                return this.dataIn.readDouble();
416            } catch (EOFException e) {
417                throw JMSExceptionSupport.createMessageEOFException(e);
418            } catch (IOException e) {
419                throw JMSExceptionSupport.createMessageFormatException(e);
420            }
421        }
422    
423        /**
424         * Reads a string that has been encoded using a modified UTF-8 format from
425         * the bytes message stream.
426         * <P>
427         * For more information on the UTF-8 format, see "File System Safe UCS
428         * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
429         * X/Open Company Ltd., Document Number: P316. This information also appears
430         * in ISO/IEC 10646, Annex P.
431         *
432         * @return a Unicode string from the bytes message stream
433         * @throws JMSException if the JMS provider fails to read the message due to
434         *                 some internal error.
435         * @throws MessageEOFException if unexpected end of bytes stream has been
436         *                 reached.
437         * @throws MessageNotReadableException if the message is in write-only mode.
438         */
439        @Override
440        public String readUTF() throws JMSException {
441            initializeReading();
442            try {
443                return this.dataIn.readUTF();
444            } catch (EOFException e) {
445                throw JMSExceptionSupport.createMessageEOFException(e);
446            } catch (IOException e) {
447                throw JMSExceptionSupport.createMessageFormatException(e);
448            }
449        }
450    
451        /**
452         * Reads a byte array from the bytes message stream.
453         * <P>
454         * If the length of array <code>value</code> is less than the number of
455         * bytes remaining to be read from the stream, the array should be filled. A
456         * subsequent call reads the next increment, and so on.
457         * <P>
458         * If the number of bytes remaining in the stream is less than the length of
459         * array <code>value</code>, the bytes should be read into the array. The
460         * return value of the total number of bytes read will be less than the
461         * length of the array, indicating that there are no more bytes left to be
462         * read from the stream. The next read of the stream returns -1.
463         *
464         * @param value the buffer into which the data is read
465         * @return the total number of bytes read into the buffer, or -1 if there is
466         *         no more data because the end of the stream has been reached
467         * @throws JMSException if the JMS provider fails to read the message due to
468         *                 some internal error.
469         * @throws MessageNotReadableException if the message is in write-only mode.
470         */
471        @Override
472        public int readBytes(byte[] value) throws JMSException {
473            return readBytes(value, value.length);
474        }
475    
476        /**
477         * Reads a portion of the bytes message stream.
478         * <P>
479         * If the length of array <code>value</code> is less than the number of
480         * bytes remaining to be read from the stream, the array should be filled. A
481         * subsequent call reads the next increment, and so on.
482         * <P>
483         * If the number of bytes remaining in the stream is less than the length of
484         * array <code>value</code>, the bytes should be read into the array. The
485         * return value of the total number of bytes read will be less than the
486         * length of the array, indicating that there are no more bytes left to be
487         * read from the stream. The next read of the stream returns -1. <p/> If
488         * <code>length</code> is negative, or <code>length</code> is greater
489         * than the length of the array <code>value</code>, then an
490         * <code>IndexOutOfBoundsException</code> is thrown. No bytes will be read
491         * from the stream for this exception case.
492         *
493         * @param value the buffer into which the data is read
494         * @param length the number of bytes to read; must be less than or equal to
495         *                <code>value.length</code>
496         * @return the total number of bytes read into the buffer, or -1 if there is
497         *         no more data because the end of the stream has been reached
498         * @throws JMSException if the JMS provider fails to read the message due to
499         *                 some internal error.
500         * @throws MessageNotReadableException if the message is in write-only mode.
501         */
502        @Override
503        public int readBytes(byte[] value, int length) throws JMSException {
504            initializeReading();
505            try {
506                int n = 0;
507                while (n < length) {
508                    int count = this.dataIn.read(value, n, length - n);
509                    if (count < 0) {
510                        break;
511                    }
512                    n += count;
513                }
514                if (n == 0 && length > 0) {
515                    n = -1;
516                }
517                return n;
518            } catch (EOFException e) {
519                throw JMSExceptionSupport.createMessageEOFException(e);
520            } catch (IOException e) {
521                throw JMSExceptionSupport.createMessageFormatException(e);
522            }
523        }
524    
525        /**
526         * Writes a <code>boolean</code> to the bytes message stream as a 1-byte
527         * value. The value <code>true</code> is written as the value
528         * <code>(byte)1</code>; the value <code>false</code> is written as the
529         * value <code>(byte)0</code>.
530         *
531         * @param value the <code>boolean</code> value to be written
532         * @throws JMSException if the JMS provider fails to write the message due
533         *                 to some internal error.
534         * @throws MessageNotWriteableException if the message is in read-only mode.
535         */
536        @Override
537        public void writeBoolean(boolean value) throws JMSException {
538            initializeWriting();
539            try {
540                this.dataOut.writeBoolean(value);
541            } catch (IOException ioe) {
542                throw JMSExceptionSupport.create(ioe);
543            }
544        }
545    
546        /**
547         * Writes a <code>byte</code> to the bytes message stream as a 1-byte
548         * value.
549         *
550         * @param value the <code>byte</code> value to be written
551         * @throws JMSException if the JMS provider fails to write the message due
552         *                 to some internal error.
553         * @throws MessageNotWriteableException if the message is in read-only mode.
554         */
555        @Override
556        public void writeByte(byte value) throws JMSException {
557            initializeWriting();
558            try {
559                this.dataOut.writeByte(value);
560            } catch (IOException ioe) {
561                throw JMSExceptionSupport.create(ioe);
562            }
563        }
564    
565        /**
566         * Writes a <code>short</code> to the bytes message stream as two bytes,
567         * high byte first.
568         *
569         * @param value the <code>short</code> to be written
570         * @throws JMSException if the JMS provider fails to write the message due
571         *                 to some internal error.
572         * @throws MessageNotWriteableException if the message is in read-only mode.
573         */
574        @Override
575        public void writeShort(short value) throws JMSException {
576            initializeWriting();
577            try {
578                this.dataOut.writeShort(value);
579            } catch (IOException ioe) {
580                throw JMSExceptionSupport.create(ioe);
581            }
582        }
583    
584        /**
585         * Writes a <code>char</code> to the bytes message stream as a 2-byte
586         * value, high byte first.
587         *
588         * @param value the <code>char</code> value to be written
589         * @throws JMSException if the JMS provider fails to write the message due
590         *                 to some internal error.
591         * @throws MessageNotWriteableException if the message is in read-only mode.
592         */
593        @Override
594        public void writeChar(char value) throws JMSException {
595            initializeWriting();
596            try {
597                this.dataOut.writeChar(value);
598            } catch (IOException ioe) {
599                throw JMSExceptionSupport.create(ioe);
600            }
601        }
602    
603        /**
604         * Writes an <code>int</code> to the bytes message stream as four bytes,
605         * high byte first.
606         *
607         * @param value the <code>int</code> to be written
608         * @throws JMSException if the JMS provider fails to write the message due
609         *                 to some internal error.
610         * @throws MessageNotWriteableException if the message is in read-only mode.
611         */
612        @Override
613        public void writeInt(int value) throws JMSException {
614            initializeWriting();
615            try {
616                this.dataOut.writeInt(value);
617            } catch (IOException ioe) {
618                throw JMSExceptionSupport.create(ioe);
619            }
620        }
621    
622        /**
623         * Writes a <code>long</code> to the bytes message stream as eight bytes,
624         * high byte first.
625         *
626         * @param value the <code>long</code> to be written
627         * @throws JMSException if the JMS provider fails to write the message due
628         *                 to some internal error.
629         * @throws MessageNotWriteableException if the message is in read-only mode.
630         */
631        @Override
632        public void writeLong(long value) throws JMSException {
633            initializeWriting();
634            try {
635                this.dataOut.writeLong(value);
636            } catch (IOException ioe) {
637                throw JMSExceptionSupport.create(ioe);
638            }
639        }
640    
641        /**
642         * Converts the <code>float</code> argument to an <code>int</code> using
643         * the <code>floatToIntBits</code> method in class <code>Float</code>,
644         * and then writes that <code>int</code> value to the bytes message stream
645         * as a 4-byte quantity, high byte first.
646         *
647         * @param value the <code>float</code> value to be written
648         * @throws JMSException if the JMS provider fails to write the message due
649         *                 to some internal error.
650         * @throws MessageNotWriteableException if the message is in read-only mode.
651         */
652        @Override
653        public void writeFloat(float value) throws JMSException {
654            initializeWriting();
655            try {
656                this.dataOut.writeFloat(value);
657            } catch (IOException ioe) {
658                throw JMSExceptionSupport.create(ioe);
659            }
660        }
661    
662        /**
663         * Converts the <code>double</code> argument to a <code>long</code>
664         * using the <code>doubleToLongBits</code> method in class
665         * <code>Double</code>, and then writes that <code>long</code> value to
666         * the bytes message stream as an 8-byte quantity, high byte first.
667         *
668         * @param value the <code>double</code> value to be written
669         * @throws JMSException if the JMS provider fails to write the message due
670         *                 to some internal error.
671         * @throws MessageNotWriteableException if the message is in read-only mode.
672         */
673        @Override
674        public void writeDouble(double value) throws JMSException {
675            initializeWriting();
676            try {
677                this.dataOut.writeDouble(value);
678            } catch (IOException ioe) {
679                throw JMSExceptionSupport.create(ioe);
680            }
681        }
682    
683        /**
684         * Writes a string to the bytes message stream using UTF-8 encoding in a
685         * machine-independent manner.
686         * <P>
687         * For more information on the UTF-8 format, see "File System Safe UCS
688         * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
689         * X/Open Company Ltd., Document Number: P316. This information also appears
690         * in ISO/IEC 10646, Annex P.
691         *
692         * @param value the <code>String</code> value to be written
693         * @throws JMSException if the JMS provider fails to write the message due
694         *                 to some internal error.
695         * @throws MessageNotWriteableException if the message is in read-only mode.
696         */
697        @Override
698        public void writeUTF(String value) throws JMSException {
699            initializeWriting();
700            try {
701                this.dataOut.writeUTF(value);
702            } catch (IOException ioe) {
703                throw JMSExceptionSupport.create(ioe);
704            }
705        }
706    
707        /**
708         * Writes a byte array to the bytes message stream.
709         *
710         * @param value the byte array to be written
711         * @throws JMSException if the JMS provider fails to write the message due
712         *                 to some internal error.
713         * @throws MessageNotWriteableException if the message is in read-only mode.
714         */
715        @Override
716        public void writeBytes(byte[] value) throws JMSException {
717            initializeWriting();
718            try {
719                this.dataOut.write(value);
720            } catch (IOException ioe) {
721                throw JMSExceptionSupport.create(ioe);
722            }
723        }
724    
725        /**
726         * Writes a portion of a byte array to the bytes message stream.
727         *
728         * @param value the byte array value to be written
729         * @param offset the initial offset within the byte array
730         * @param length the number of bytes to use
731         * @throws JMSException if the JMS provider fails to write the message due
732         *                 to some internal error.
733         * @throws MessageNotWriteableException if the message is in read-only mode.
734         */
735        @Override
736        public void writeBytes(byte[] value, int offset, int length) throws JMSException {
737            initializeWriting();
738            try {
739                this.dataOut.write(value, offset, length);
740            } catch (IOException ioe) {
741                throw JMSExceptionSupport.create(ioe);
742            }
743        }
744    
745        /**
746         * Writes an object to the bytes message stream.
747         * <P>
748         * This method works only for the objectified primitive object types (<code>Integer</code>,<code>Double</code>,
749         * <code>Long</code> &nbsp;...), <code>String</code> objects, and byte
750         * arrays.
751         *
752         * @param value the object in the Java programming language ("Java object")
753         *                to be written; it must not be null
754         * @throws JMSException if the JMS provider fails to write the message due
755         *                 to some internal error.
756         * @throws MessageFormatException if the object is of an invalid type.
757         * @throws MessageNotWriteableException if the message is in read-only mode.
758         * @throws java.lang.NullPointerException if the parameter
759         *                 <code>value</code> is null.
760         */
761        @Override
762        public void writeObject(Object value) throws JMSException {
763            if (value == null) {
764                throw new NullPointerException();
765            }
766            initializeWriting();
767            if (value instanceof Boolean) {
768                writeBoolean(((Boolean)value).booleanValue());
769            } else if (value instanceof Character) {
770                writeChar(((Character)value).charValue());
771            } else if (value instanceof Byte) {
772                writeByte(((Byte)value).byteValue());
773            } else if (value instanceof Short) {
774                writeShort(((Short)value).shortValue());
775            } else if (value instanceof Integer) {
776                writeInt(((Integer)value).intValue());
777            } else if (value instanceof Long) {
778                writeLong(((Long)value).longValue());
779            } else if (value instanceof Float) {
780                writeFloat(((Float)value).floatValue());
781            } else if (value instanceof Double) {
782                writeDouble(((Double)value).doubleValue());
783            } else if (value instanceof String) {
784                writeUTF(value.toString());
785            } else if (value instanceof byte[]) {
786                writeBytes((byte[])value);
787            } else {
788                throw new MessageFormatException("Cannot write non-primitive type:" + value.getClass());
789            }
790        }
791    
792        /**
793         * Puts the message body in read-only mode and repositions the stream of
794         * bytes to the beginning.
795         *
796         * @throws JMSException if an internal error occurs
797         */
798        @Override
799        public void reset() throws JMSException {
800            storeContent();
801            this.bytesOut = null;
802            if (dataIn != null) {
803                try {
804                    // Eagerly release potential Inflater memory buffers.
805                    dataIn.close();
806                } catch (Exception e) {
807                }
808            }
809            this.dataIn = null;
810            this.dataOut = null;
811            setReadOnlyBody(true);
812        }
813    
814        private void initializeWriting() throws JMSException {
815            checkReadOnlyBody();
816            if (this.dataOut == null) {
817                this.bytesOut = new ByteArrayOutputStream();
818                OutputStream os = bytesOut;
819                ActiveMQConnection connection = getConnection();
820                if (connection != null && connection.isUseCompression()) {
821                    // keep track of the real length of the content if
822                    // we are compressed.
823                    try {
824                        os.write(new byte[4]);
825                    } catch (IOException e) {
826                        throw JMSExceptionSupport.create(e);
827                    }
828                    length = 0;
829                    compressed = true;
830                    final Deflater deflater = new Deflater(Deflater.BEST_SPEED);
831                    os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) {
832                        @Override
833                        public void write(byte[] arg0) throws IOException {
834                            length += arg0.length;
835                            out.write(arg0);
836                        }
837    
838                        @Override
839                        public void write(byte[] arg0, int arg1, int arg2) throws IOException {
840                            length += arg2;
841                            out.write(arg0, arg1, arg2);
842                        }
843    
844                        @Override
845                        public void write(int arg0) throws IOException {
846                            length++;
847                            out.write(arg0);
848                        }
849    
850                        @Override
851                        public void close() throws IOException {
852                            super.close();
853                            deflater.end();
854                        }
855                    };
856                }
857                this.dataOut = new DataOutputStream(os);
858            }
859        }
860    
861        protected void checkWriteOnlyBody() throws MessageNotReadableException {
862            if (!readOnlyBody) {
863                throw new MessageNotReadableException("Message body is write-only");
864            }
865        }
866    
867        private void initializeReading() throws JMSException {
868            checkWriteOnlyBody();
869            if (dataIn == null) {
870                ByteSequence data = getContent();
871                if (data == null) {
872                    data = new ByteSequence(new byte[] {}, 0, 0);
873                }
874                InputStream is = new ByteArrayInputStream(data);
875                if (isCompressed()) {
876                    // keep track of the real length of the content if
877                    // we are compressed.
878                    try {
879                        DataInputStream dis = new DataInputStream(is);
880                        length = dis.readInt();
881                        dis.close();
882                    } catch (IOException e) {
883                        throw JMSExceptionSupport.create(e);
884                    }
885                    is = new InflaterInputStream(is);
886                } else {
887                    length = data.getLength();
888                }
889                dataIn = new DataInputStream(is);
890            }
891        }
892    
893        @Override
894        public void setObjectProperty(String name, Object value) throws JMSException {
895            initializeWriting();
896            super.setObjectProperty(name, value);
897        }
898    
899        @Override
900        public String toString() {
901            return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
902        }
903    
904        @Override
905        protected void doCompress() throws IOException {
906            compressed = true;
907            ByteSequence bytes = getContent();
908            int length = bytes.getLength();
909            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
910            bytesOut.write(new byte[4]);
911            DeflaterOutputStream os = new DeflaterOutputStream(bytesOut);
912            DataOutputStream dataOut = new DataOutputStream(os);
913            dataOut.write(bytes.data, bytes.offset, bytes.length);
914            dataOut.flush();
915            dataOut.close();
916            bytes = bytesOut.toByteSequence();
917            ByteSequenceData.writeIntBig(bytes, length);
918            bytes.offset = 0;
919            setContent(bytes);
920        }
921    
922        @Override
923        protected void finalize() throws Throwable {
924            // Attempt to do eager close in case of compressed data which uses a
925            // wrapped InflaterInputStream.
926            if (dataIn != null) {
927                try {
928                    dataIn.close();
929                } catch(Exception e) {
930                }
931            }
932        }
933    }