001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.EOFException;
022    import java.io.FilterOutputStream;
023    import java.io.IOException;
024    import java.io.InputStream;
025    import java.io.OutputStream;
026    import java.util.zip.Deflater;
027    import java.util.zip.DeflaterOutputStream;
028    import java.util.zip.InflaterInputStream;
029    
030    import javax.jms.BytesMessage;
031    import javax.jms.JMSException;
032    import javax.jms.MessageFormatException;
033    import javax.jms.MessageNotReadableException;
034    
035    import org.apache.activemq.ActiveMQConnection;
036    import org.apache.activemq.util.ByteArrayInputStream;
037    import org.apache.activemq.util.ByteArrayOutputStream;
038    import org.apache.activemq.util.ByteSequence;
039    import org.apache.activemq.util.ByteSequenceData;
040    import org.apache.activemq.util.JMSExceptionSupport;
041    
042    /**
043     * A <CODE>BytesMessage</CODE> object is used to send a message containing a
044     * stream of uninterpreted bytes. It inherits from the <CODE>Message</CODE>
045     * interface and adds a bytes message body. The receiver of the message supplies
046     * the interpretation of the bytes.
047     * <P>
048     * The <CODE>BytesMessage</CODE> methods are based largely on those found in
049     * <CODE>java.io.DataInputStream</CODE> and
050     * <CODE>java.io.DataOutputStream</CODE>.
051     * <P>
052     * This message type is for client encoding of existing message formats. If
053     * possible, one of the other self-defining message types should be used
054     * instead.
055     * <P>
056     * Although the JMS API allows the use of message properties with byte messages,
057     * they are typically not used, since the inclusion of properties may affect the
058     * format.
059     * <P>
060     * The primitive types can be written explicitly using methods for each type.
061     * They may also be written generically as objects. For instance, a call to
062     * <CODE>BytesMessage.writeInt(6)</CODE> is equivalent to
063     * <CODE> BytesMessage.writeObject(new Integer(6))</CODE>. Both forms are
064     * provided, because the explicit form is convenient for static programming, and
065     * the object form is needed when types are not known at compile time.
066     * <P>
067     * When the message is first created, and when <CODE>clearBody</CODE> is
068     * called, the body of the message is in write-only mode. After the first call
069     * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
070     * After a message has been sent, the client that sent it can retain and modify
071     * it without affecting the message that has been sent. The same message object
072     * can be sent multiple times. When a message has been received, the provider
073     * has called <CODE>reset</CODE> so that the message body is in read-only mode
074     * for the client.
075     * <P>
076     * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
077     * message body is cleared and the message is in write-only mode.
078     * <P>
079     * If a client attempts to read a message in write-only mode, a
080     * <CODE>MessageNotReadableException</CODE> is thrown.
081     * <P>
082     * If a client attempts to write a message in read-only mode, a
083     * <CODE>MessageNotWriteableException</CODE> is thrown.
084     *
085     * @openwire:marshaller code=24
086     * @see javax.jms.Session#createBytesMessage()
087     * @see javax.jms.MapMessage
088     * @see javax.jms.Message
089     * @see javax.jms.ObjectMessage
090     * @see javax.jms.StreamMessage
091     * @see javax.jms.TextMessage
092     */
093    public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessage {
094    
095        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BYTES_MESSAGE;
096    
097        protected transient DataOutputStream dataOut;
098        protected transient ByteArrayOutputStream bytesOut;
099        protected transient DataInputStream dataIn;
100        protected transient int length;
101    
102        public Message copy() {
103            ActiveMQBytesMessage copy = new ActiveMQBytesMessage();
104            copy(copy);
105            return copy;
106        }
107    
108        private void copy(ActiveMQBytesMessage copy) {
109            storeContent();
110            super.copy(copy);
111            copy.dataOut = null;
112            copy.bytesOut = null;
113            copy.dataIn = null;
114        }
115    
116        public void onSend() throws JMSException {
117            super.onSend();
118            storeContent();
119        }
120    
121        @Override
122        public void storeContent() {
123            try {
124                if (dataOut != null) {
125                    dataOut.close();
126                    ByteSequence bs = bytesOut.toByteSequence();
127                    if (compressed) {
128                        int pos = bs.offset;
129                        ByteSequenceData.writeIntBig(bs, length);
130                        bs.offset = pos;
131                    }
132                    setContent(bs);
133                    bytesOut = null;
134                    dataOut = null;
135                }
136            } catch (IOException ioe) {
137                throw new RuntimeException(ioe.getMessage(), ioe); // TODO verify
138                                                                    // RuntimeException
139            }
140        }
141    
142        public byte getDataStructureType() {
143            return DATA_STRUCTURE_TYPE;
144        }
145    
146        public String getJMSXMimeType() {
147            return "jms/bytes-message";
148        }
149    
150        /**
151         * Clears out the message body. Clearing a message's body does not clear its
152         * header values or property entries.
153         * <P>
154         * If this message body was read-only, calling this method leaves the
155         * message body in the same state as an empty body in a newly created
156         * message.
157         *
158         * @throws JMSException if the JMS provider fails to clear the message body
159         *                 due to some internal error.
160         */
161        public void clearBody() throws JMSException {
162            super.clearBody();
163            this.dataOut = null;
164            this.dataIn = null;
165            this.bytesOut = null;
166        }
167    
168        /**
169         * Gets the number of bytes of the message body when the message is in
170         * read-only mode. The value returned can be used to allocate a byte array.
171         * The value returned is the entire length of the message body, regardless
172         * of where the pointer for reading the message is currently located.
173         *
174         * @return number of bytes in the message
175         * @throws JMSException if the JMS provider fails to read the message due to
176         *                 some internal error.
177         * @throws MessageNotReadableException if the message is in write-only mode.
178         * @since 1.1
179         */
180    
181        public long getBodyLength() throws JMSException {
182            initializeReading();
183            return length;
184        }
185    
186        /**
187         * Reads a <code>boolean</code> from the bytes message stream.
188         *
189         * @return the <code>boolean</code> value read
190         * @throws JMSException if the JMS provider fails to read the message due to
191         *                 some internal error.
192         * @throws MessageEOFException if unexpected end of bytes stream has been
193         *                 reached.
194         * @throws MessageNotReadableException if the message is in write-only mode.
195         */
196        public boolean readBoolean() throws JMSException {
197            initializeReading();
198            try {
199                return this.dataIn.readBoolean();
200            } catch (EOFException e) {
201                throw JMSExceptionSupport.createMessageEOFException(e);
202            } catch (IOException e) {
203                throw JMSExceptionSupport.createMessageFormatException(e);
204            }
205        }
206    
207        /**
208         * Reads a signed 8-bit value from the bytes message stream.
209         *
210         * @return the next byte from the bytes message stream as a signed 8-bit
211         *         <code>byte</code>
212         * @throws JMSException if the JMS provider fails to read the message due to
213         *                 some internal error.
214         * @throws MessageEOFException if unexpected end of bytes stream has been
215         *                 reached.
216         * @throws MessageNotReadableException if the message is in write-only mode.
217         */
218        public byte readByte() throws JMSException {
219            initializeReading();
220            try {
221                return this.dataIn.readByte();
222            } catch (EOFException e) {
223                throw JMSExceptionSupport.createMessageEOFException(e);
224            } catch (IOException e) {
225                throw JMSExceptionSupport.createMessageFormatException(e);
226            }
227        }
228    
229        /**
230         * Reads an unsigned 8-bit number from the bytes message stream.
231         *
232         * @return the next byte from the bytes message stream, interpreted as an
233         *         unsigned 8-bit number
234         * @throws JMSException if the JMS provider fails to read the message due to
235         *                 some internal error.
236         * @throws MessageEOFException if unexpected end of bytes stream has been
237         *                 reached.
238         * @throws MessageNotReadableException if the message is in write-only mode.
239         */
240        public int readUnsignedByte() throws JMSException {
241            initializeReading();
242            try {
243                return this.dataIn.readUnsignedByte();
244            } catch (EOFException e) {
245                throw JMSExceptionSupport.createMessageEOFException(e);
246            } catch (IOException e) {
247                throw JMSExceptionSupport.createMessageFormatException(e);
248            }
249        }
250    
251        /**
252         * Reads a signed 16-bit number from the bytes message stream.
253         *
254         * @return the next two bytes from the bytes message stream, interpreted as
255         *         a signed 16-bit number
256         * @throws JMSException if the JMS provider fails to read the message due to
257         *                 some internal error.
258         * @throws MessageEOFException if unexpected end of bytes stream has been
259         *                 reached.
260         * @throws MessageNotReadableException if the message is in write-only mode.
261         */
262        public short readShort() throws JMSException {
263            initializeReading();
264            try {
265                return this.dataIn.readShort();
266            } catch (EOFException e) {
267                throw JMSExceptionSupport.createMessageEOFException(e);
268            } catch (IOException e) {
269                throw JMSExceptionSupport.createMessageFormatException(e);
270            }
271        }
272    
273        /**
274         * Reads an unsigned 16-bit number from the bytes message stream.
275         *
276         * @return the next two bytes from the bytes message stream, interpreted as
277         *         an unsigned 16-bit integer
278         * @throws JMSException if the JMS provider fails to read the message due to
279         *                 some internal error.
280         * @throws MessageEOFException if unexpected end of bytes stream has been
281         *                 reached.
282         * @throws MessageNotReadableException if the message is in write-only mode.
283         */
284        public int readUnsignedShort() throws JMSException {
285            initializeReading();
286            try {
287                return this.dataIn.readUnsignedShort();
288            } catch (EOFException e) {
289                throw JMSExceptionSupport.createMessageEOFException(e);
290            } catch (IOException e) {
291                throw JMSExceptionSupport.createMessageFormatException(e);
292            }
293        }
294    
295        /**
296         * Reads a Unicode character value from the bytes message stream.
297         *
298         * @return the next two bytes from the bytes message stream as a Unicode
299         *         character
300         * @throws JMSException if the JMS provider fails to read the message due to
301         *                 some internal error.
302         * @throws MessageEOFException if unexpected end of bytes stream has been
303         *                 reached.
304         * @throws MessageNotReadableException if the message is in write-only mode.
305         */
306        public char readChar() throws JMSException {
307            initializeReading();
308            try {
309                return this.dataIn.readChar();
310            } catch (EOFException e) {
311                throw JMSExceptionSupport.createMessageEOFException(e);
312            } catch (IOException e) {
313                throw JMSExceptionSupport.createMessageFormatException(e);
314            }
315        }
316    
317        /**
318         * Reads a signed 32-bit integer from the bytes message stream.
319         *
320         * @return the next four bytes from the bytes message stream, interpreted as
321         *         an <code>int</code>
322         * @throws JMSException if the JMS provider fails to read the message due to
323         *                 some internal error.
324         * @throws MessageEOFException if unexpected end of bytes stream has been
325         *                 reached.
326         * @throws MessageNotReadableException if the message is in write-only mode.
327         */
328        public int readInt() throws JMSException {
329            initializeReading();
330            try {
331                return this.dataIn.readInt();
332            } catch (EOFException e) {
333                throw JMSExceptionSupport.createMessageEOFException(e);
334            } catch (IOException e) {
335                throw JMSExceptionSupport.createMessageFormatException(e);
336            }
337        }
338    
339        /**
340         * Reads a signed 64-bit integer from the bytes message stream.
341         *
342         * @return the next eight bytes from the bytes message stream, interpreted
343         *         as a <code>long</code>
344         * @throws JMSException if the JMS provider fails to read the message due to
345         *                 some internal error.
346         * @throws MessageEOFException if unexpected end of bytes stream has been
347         *                 reached.
348         * @throws MessageNotReadableException if the message is in write-only mode.
349         */
350        public long readLong() throws JMSException {
351            initializeReading();
352            try {
353                return this.dataIn.readLong();
354            } catch (EOFException e) {
355                throw JMSExceptionSupport.createMessageEOFException(e);
356            } catch (IOException e) {
357                throw JMSExceptionSupport.createMessageFormatException(e);
358            }
359        }
360    
361        /**
362         * Reads a <code>float</code> from the bytes message stream.
363         *
364         * @return the next four bytes from the bytes message stream, interpreted as
365         *         a <code>float</code>
366         * @throws JMSException if the JMS provider fails to read the message due to
367         *                 some internal error.
368         * @throws MessageEOFException if unexpected end of bytes stream has been
369         *                 reached.
370         * @throws MessageNotReadableException if the message is in write-only mode.
371         */
372        public float readFloat() throws JMSException {
373            initializeReading();
374            try {
375                return this.dataIn.readFloat();
376            } catch (EOFException e) {
377                throw JMSExceptionSupport.createMessageEOFException(e);
378            } catch (IOException e) {
379                throw JMSExceptionSupport.createMessageFormatException(e);
380            }
381        }
382    
383        /**
384         * Reads a <code>double</code> from the bytes message stream.
385         *
386         * @return the next eight bytes from the bytes message stream, interpreted
387         *         as a <code>double</code>
388         * @throws JMSException if the JMS provider fails to read the message due to
389         *                 some internal error.
390         * @throws MessageEOFException if unexpected end of bytes stream has been
391         *                 reached.
392         * @throws MessageNotReadableException if the message is in write-only mode.
393         */
394        public double readDouble() throws JMSException {
395            initializeReading();
396            try {
397                return this.dataIn.readDouble();
398            } catch (EOFException e) {
399                throw JMSExceptionSupport.createMessageEOFException(e);
400            } catch (IOException e) {
401                throw JMSExceptionSupport.createMessageFormatException(e);
402            }
403        }
404    
405        /**
406         * Reads a string that has been encoded using a modified UTF-8 format from
407         * the bytes message stream.
408         * <P>
409         * For more information on the UTF-8 format, see "File System Safe UCS
410         * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
411         * X/Open Company Ltd., Document Number: P316. This information also appears
412         * in ISO/IEC 10646, Annex P.
413         *
414         * @return a Unicode string from the bytes message stream
415         * @throws JMSException if the JMS provider fails to read the message due to
416         *                 some internal error.
417         * @throws MessageEOFException if unexpected end of bytes stream has been
418         *                 reached.
419         * @throws MessageNotReadableException if the message is in write-only mode.
420         */
421        public String readUTF() throws JMSException {
422            initializeReading();
423            try {
424                return this.dataIn.readUTF();
425            } catch (EOFException e) {
426                throw JMSExceptionSupport.createMessageEOFException(e);
427            } catch (IOException e) {
428                throw JMSExceptionSupport.createMessageFormatException(e);
429            }
430        }
431    
432        /**
433         * Reads a byte array from the bytes message stream.
434         * <P>
435         * If the length of array <code>value</code> is less than the number of
436         * bytes remaining to be read from the stream, the array should be filled. A
437         * subsequent call reads the next increment, and so on.
438         * <P>
439         * If the number of bytes remaining in the stream is less than the length of
440         * array <code>value</code>, the bytes should be read into the array. The
441         * return value of the total number of bytes read will be less than the
442         * length of the array, indicating that there are no more bytes left to be
443         * read from the stream. The next read of the stream returns -1.
444         *
445         * @param value the buffer into which the data is read
446         * @return the total number of bytes read into the buffer, or -1 if there is
447         *         no more data because the end of the stream has been reached
448         * @throws JMSException if the JMS provider fails to read the message due to
449         *                 some internal error.
450         * @throws MessageNotReadableException if the message is in write-only mode.
451         */
452        public int readBytes(byte[] value) throws JMSException {
453            return readBytes(value, value.length);
454        }
455    
456        /**
457         * Reads a portion of the bytes message stream.
458         * <P>
459         * If the length of array <code>value</code> is less than the number of
460         * bytes remaining to be read from the stream, the array should be filled. A
461         * subsequent call reads the next increment, and so on.
462         * <P>
463         * If the number of bytes remaining in the stream is less than the length of
464         * array <code>value</code>, the bytes should be read into the array. The
465         * return value of the total number of bytes read will be less than the
466         * length of the array, indicating that there are no more bytes left to be
467         * read from the stream. The next read of the stream returns -1. <p/> If
468         * <code>length</code> is negative, or <code>length</code> is greater
469         * than the length of the array <code>value</code>, then an
470         * <code>IndexOutOfBoundsException</code> is thrown. No bytes will be read
471         * from the stream for this exception case.
472         *
473         * @param value the buffer into which the data is read
474         * @param length the number of bytes to read; must be less than or equal to
475         *                <code>value.length</code>
476         * @return the total number of bytes read into the buffer, or -1 if there is
477         *         no more data because the end of the stream has been reached
478         * @throws JMSException if the JMS provider fails to read the message due to
479         *                 some internal error.
480         * @throws MessageNotReadableException if the message is in write-only mode.
481         */
482        public int readBytes(byte[] value, int length) throws JMSException {
483            initializeReading();
484            try {
485                int n = 0;
486                while (n < length) {
487                    int count = this.dataIn.read(value, n, length - n);
488                    if (count < 0) {
489                        break;
490                    }
491                    n += count;
492                }
493                if (n == 0 && length > 0) {
494                    n = -1;
495                }
496                return n;
497            } catch (EOFException e) {
498                throw JMSExceptionSupport.createMessageEOFException(e);
499            } catch (IOException e) {
500                throw JMSExceptionSupport.createMessageFormatException(e);
501            }
502        }
503    
504        /**
505         * Writes a <code>boolean</code> to the bytes message stream as a 1-byte
506         * value. The value <code>true</code> is written as the value
507         * <code>(byte)1</code>; the value <code>false</code> is written as the
508         * value <code>(byte)0</code>.
509         *
510         * @param value the <code>boolean</code> value to be written
511         * @throws JMSException if the JMS provider fails to write the message due
512         *                 to some internal error.
513         * @throws MessageNotWriteableException if the message is in read-only mode.
514         */
515        public void writeBoolean(boolean value) throws JMSException {
516            initializeWriting();
517            try {
518                this.dataOut.writeBoolean(value);
519            } catch (IOException ioe) {
520                throw JMSExceptionSupport.create(ioe);
521            }
522        }
523    
524        /**
525         * Writes a <code>byte</code> to the bytes message stream as a 1-byte
526         * value.
527         *
528         * @param value the <code>byte</code> value to be written
529         * @throws JMSException if the JMS provider fails to write the message due
530         *                 to some internal error.
531         * @throws MessageNotWriteableException if the message is in read-only mode.
532         */
533        public void writeByte(byte value) throws JMSException {
534            initializeWriting();
535            try {
536                this.dataOut.writeByte(value);
537            } catch (IOException ioe) {
538                throw JMSExceptionSupport.create(ioe);
539            }
540        }
541    
542        /**
543         * Writes a <code>short</code> to the bytes message stream as two bytes,
544         * high byte first.
545         *
546         * @param value the <code>short</code> to be written
547         * @throws JMSException if the JMS provider fails to write the message due
548         *                 to some internal error.
549         * @throws MessageNotWriteableException if the message is in read-only mode.
550         */
551        public void writeShort(short value) throws JMSException {
552            initializeWriting();
553            try {
554                this.dataOut.writeShort(value);
555            } catch (IOException ioe) {
556                throw JMSExceptionSupport.create(ioe);
557            }
558        }
559    
560        /**
561         * Writes a <code>char</code> to the bytes message stream as a 2-byte
562         * value, high byte first.
563         *
564         * @param value the <code>char</code> value to be written
565         * @throws JMSException if the JMS provider fails to write the message due
566         *                 to some internal error.
567         * @throws MessageNotWriteableException if the message is in read-only mode.
568         */
569        public void writeChar(char value) throws JMSException {
570            initializeWriting();
571            try {
572                this.dataOut.writeChar(value);
573            } catch (IOException ioe) {
574                throw JMSExceptionSupport.create(ioe);
575            }
576        }
577    
578        /**
579         * Writes an <code>int</code> to the bytes message stream as four bytes,
580         * high byte first.
581         *
582         * @param value the <code>int</code> to be written
583         * @throws JMSException if the JMS provider fails to write the message due
584         *                 to some internal error.
585         * @throws MessageNotWriteableException if the message is in read-only mode.
586         */
587        public void writeInt(int value) throws JMSException {
588            initializeWriting();
589            try {
590                this.dataOut.writeInt(value);
591            } catch (IOException ioe) {
592                throw JMSExceptionSupport.create(ioe);
593            }
594        }
595    
596        /**
597         * Writes a <code>long</code> to the bytes message stream as eight bytes,
598         * high byte first.
599         *
600         * @param value the <code>long</code> to be written
601         * @throws JMSException if the JMS provider fails to write the message due
602         *                 to some internal error.
603         * @throws MessageNotWriteableException if the message is in read-only mode.
604         */
605        public void writeLong(long value) throws JMSException {
606            initializeWriting();
607            try {
608                this.dataOut.writeLong(value);
609            } catch (IOException ioe) {
610                throw JMSExceptionSupport.create(ioe);
611            }
612        }
613    
614        /**
615         * Converts the <code>float</code> argument to an <code>int</code> using
616         * the <code>floatToIntBits</code> method in class <code>Float</code>,
617         * and then writes that <code>int</code> value to the bytes message stream
618         * as a 4-byte quantity, high byte first.
619         *
620         * @param value the <code>float</code> value to be written
621         * @throws JMSException if the JMS provider fails to write the message due
622         *                 to some internal error.
623         * @throws MessageNotWriteableException if the message is in read-only mode.
624         */
625        public void writeFloat(float value) throws JMSException {
626            initializeWriting();
627            try {
628                this.dataOut.writeFloat(value);
629            } catch (IOException ioe) {
630                throw JMSExceptionSupport.create(ioe);
631            }
632        }
633    
634        /**
635         * Converts the <code>double</code> argument to a <code>long</code>
636         * using the <code>doubleToLongBits</code> method in class
637         * <code>Double</code>, and then writes that <code>long</code> value to
638         * the bytes message stream as an 8-byte quantity, high byte first.
639         *
640         * @param value the <code>double</code> value to be written
641         * @throws JMSException if the JMS provider fails to write the message due
642         *                 to some internal error.
643         * @throws MessageNotWriteableException if the message is in read-only mode.
644         */
645        public void writeDouble(double value) throws JMSException {
646            initializeWriting();
647            try {
648                this.dataOut.writeDouble(value);
649            } catch (IOException ioe) {
650                throw JMSExceptionSupport.create(ioe);
651            }
652        }
653    
654        /**
655         * Writes a string to the bytes message stream using UTF-8 encoding in a
656         * machine-independent manner.
657         * <P>
658         * For more information on the UTF-8 format, see "File System Safe UCS
659         * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
660         * X/Open Company Ltd., Document Number: P316. This information also appears
661         * in ISO/IEC 10646, Annex P.
662         *
663         * @param value the <code>String</code> value to be written
664         * @throws JMSException if the JMS provider fails to write the message due
665         *                 to some internal error.
666         * @throws MessageNotWriteableException if the message is in read-only mode.
667         */
668        public void writeUTF(String value) throws JMSException {
669            initializeWriting();
670            try {
671                this.dataOut.writeUTF(value);
672            } catch (IOException ioe) {
673                throw JMSExceptionSupport.create(ioe);
674            }
675        }
676    
677        /**
678         * Writes a byte array to the bytes message stream.
679         *
680         * @param value the byte array to be written
681         * @throws JMSException if the JMS provider fails to write the message due
682         *                 to some internal error.
683         * @throws MessageNotWriteableException if the message is in read-only mode.
684         */
685        public void writeBytes(byte[] value) throws JMSException {
686            initializeWriting();
687            try {
688                this.dataOut.write(value);
689            } catch (IOException ioe) {
690                throw JMSExceptionSupport.create(ioe);
691            }
692        }
693    
694        /**
695         * Writes a portion of a byte array to the bytes message stream.
696         *
697         * @param value the byte array value to be written
698         * @param offset the initial offset within the byte array
699         * @param length the number of bytes to use
700         * @throws JMSException if the JMS provider fails to write the message due
701         *                 to some internal error.
702         * @throws MessageNotWriteableException if the message is in read-only mode.
703         */
704        public void writeBytes(byte[] value, int offset, int length) throws JMSException {
705            initializeWriting();
706            try {
707                this.dataOut.write(value, offset, length);
708            } catch (IOException ioe) {
709                throw JMSExceptionSupport.create(ioe);
710            }
711        }
712    
713        /**
714         * Writes an object to the bytes message stream.
715         * <P>
716         * This method works only for the objectified primitive object types (<code>Integer</code>,<code>Double</code>,
717         * <code>Long</code> &nbsp;...), <code>String</code> objects, and byte
718         * arrays.
719         *
720         * @param value the object in the Java programming language ("Java object")
721         *                to be written; it must not be null
722         * @throws JMSException if the JMS provider fails to write the message due
723         *                 to some internal error.
724         * @throws MessageFormatException if the object is of an invalid type.
725         * @throws MessageNotWriteableException if the message is in read-only mode.
726         * @throws java.lang.NullPointerException if the parameter
727         *                 <code>value</code> is null.
728         */
729        public void writeObject(Object value) throws JMSException {
730            if (value == null) {
731                throw new NullPointerException();
732            }
733            initializeWriting();
734            if (value instanceof Boolean) {
735                writeBoolean(((Boolean)value).booleanValue());
736            } else if (value instanceof Character) {
737                writeChar(((Character)value).charValue());
738            } else if (value instanceof Byte) {
739                writeByte(((Byte)value).byteValue());
740            } else if (value instanceof Short) {
741                writeShort(((Short)value).shortValue());
742            } else if (value instanceof Integer) {
743                writeInt(((Integer)value).intValue());
744            } else if (value instanceof Long) {
745                writeLong(((Long)value).longValue());
746            } else if (value instanceof Float) {
747                writeFloat(((Float)value).floatValue());
748            } else if (value instanceof Double) {
749                writeDouble(((Double)value).doubleValue());
750            } else if (value instanceof String) {
751                writeUTF(value.toString());
752            } else if (value instanceof byte[]) {
753                writeBytes((byte[])value);
754            } else {
755                throw new MessageFormatException("Cannot write non-primitive type:" + value.getClass());
756            }
757        }
758    
759        /**
760         * Puts the message body in read-only mode and repositions the stream of
761         * bytes to the beginning.
762         *
763         * @throws JMSException if an internal error occurs
764         */
765        public void reset() throws JMSException {
766            storeContent();
767            this.bytesOut = null;
768            this.dataIn = null;
769            this.dataOut = null;
770            setReadOnlyBody(true);
771        }
772    
773        private void initializeWriting() throws JMSException {
774            checkReadOnlyBody();
775            if (this.dataOut == null) {
776                this.bytesOut = new ByteArrayOutputStream();
777                OutputStream os = bytesOut;
778                ActiveMQConnection connection = getConnection();
779                if (connection != null && connection.isUseCompression()) {
780                    // keep track of the real length of the content if
781                    // we are compressed.
782                    try {
783                        os.write(new byte[4]);
784                    } catch (IOException e) {
785                        throw JMSExceptionSupport.create(e);
786                    }
787                    length = 0;
788                    compressed = true;
789                    final Deflater deflater = new Deflater(Deflater.BEST_SPEED);
790                    os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) {
791                        public void write(byte[] arg0) throws IOException {
792                            length += arg0.length;
793                            out.write(arg0);
794                        }
795    
796                        public void write(byte[] arg0, int arg1, int arg2) throws IOException {
797                            length += arg2;
798                            out.write(arg0, arg1, arg2);
799                        }
800    
801                        public void write(int arg0) throws IOException {
802                            length++;
803                            out.write(arg0);
804                        }
805    
806                        @Override
807                        public void close() throws IOException {
808                            super.close();
809                            deflater.end();
810                        }
811                    };
812                }
813                this.dataOut = new DataOutputStream(os);
814            }
815        }
816    
817        protected void checkWriteOnlyBody() throws MessageNotReadableException {
818            if (!readOnlyBody) {
819                throw new MessageNotReadableException("Message body is write-only");
820            }
821        }
822    
823        private void initializeReading() throws JMSException {
824            checkWriteOnlyBody();
825            if (dataIn == null) {
826                ByteSequence data = getContent();
827                if (data == null) {
828                    data = new ByteSequence(new byte[] {}, 0, 0);
829                }
830                InputStream is = new ByteArrayInputStream(data);
831                if (isCompressed()) {
832                    // keep track of the real length of the content if
833                    // we are compressed.
834                    try {
835                        DataInputStream dis = new DataInputStream(is);
836                        length = dis.readInt();
837                        dis.close();
838                    } catch (IOException e) {
839                        throw JMSExceptionSupport.create(e);
840                    }
841                    is = new InflaterInputStream(is);
842                } else {
843                    length = data.getLength();
844                }
845                dataIn = new DataInputStream(is);
846            }
847        }
848    
849        public void setObjectProperty(String name, Object value) throws JMSException {
850            initializeWriting();
851            super.setObjectProperty(name, value);
852        }
853    
854        public String toString() {
855            return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
856        }
857    
858        @Override
859        protected void doCompress() throws IOException {
860            compressed = true;
861            ByteSequence bytes = getContent();
862            int length = bytes.getLength();
863            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
864            bytesOut.write(new byte[4]);
865            DeflaterOutputStream os = new DeflaterOutputStream(bytesOut);
866            DataOutputStream dataOut = new DataOutputStream(os);
867            dataOut.write(bytes.data, bytes.offset, bytes.length);
868            dataOut.flush();
869            dataOut.close();
870            bytes = bytesOut.toByteSequence();
871            ByteSequenceData.writeIntBig(bytes, length);
872            bytes.offset = 0;
873            setContent(bytes);
874        }
875    }