001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.EOFException;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.util.Arrays;
026import java.util.zip.Deflater;
027import java.util.zip.Inflater;
028
029import javax.jms.BytesMessage;
030import javax.jms.JMSException;
031import javax.jms.MessageEOFException;
032import javax.jms.MessageFormatException;
033import javax.jms.MessageNotReadableException;
034import javax.jms.MessageNotWriteableException;
035
036import org.apache.activemq.ActiveMQConnection;
037import org.apache.activemq.util.ByteArrayInputStream;
038import org.apache.activemq.util.ByteArrayOutputStream;
039import org.apache.activemq.util.ByteSequence;
040import org.apache.activemq.util.ByteSequenceData;
041import org.apache.activemq.util.JMSExceptionSupport;
042
043/**
044 * A <CODE>BytesMessage</CODE> object is used to send a message containing a
045 * stream of uninterpreted bytes. It inherits from the <CODE>Message</CODE>
046 * interface and adds a bytes message body. The receiver of the message supplies
047 * the interpretation of the bytes.
048 * <P>
049 * The <CODE>BytesMessage</CODE> methods are based largely on those found in
050 * <CODE>java.io.DataInputStream</CODE> and
051 * <CODE>java.io.DataOutputStream</CODE>.
052 * <P>
053 * This message type is for client encoding of existing message formats. If
054 * possible, one of the other self-defining message types should be used
055 * instead.
056 * <P>
057 * Although the JMS API allows the use of message properties with byte messages,
058 * they are typically not used, since the inclusion of properties may affect the
059 * format.
060 * <P>
061 * The primitive types can be written explicitly using methods for each type.
062 * They may also be written generically as objects. For instance, a call to
063 * <CODE>BytesMessage.writeInt(6)</CODE> is equivalent to
064 * <CODE> BytesMessage.writeObject(new Integer(6))</CODE>. Both forms are
065 * provided, because the explicit form is convenient for static programming, and
066 * the object form is needed when types are not known at compile time.
067 * <P>
068 * When the message is first created, and when <CODE>clearBody</CODE> is
069 * called, the body of the message is in write-only mode. After the first call
070 * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
071 * After a message has been sent, the client that sent it can retain and modify
072 * it without affecting the message that has been sent. The same message object
073 * can be sent multiple times. When a message has been received, the provider
074 * has called <CODE>reset</CODE> so that the message body is in read-only mode
075 * for the client.
076 * <P>
077 * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
078 * message body is cleared and the message is in write-only mode.
079 * <P>
080 * If a client attempts to read a message in write-only mode, a
081 * <CODE>MessageNotReadableException</CODE> is thrown.
082 * <P>
083 * If a client attempts to write a message in read-only mode, a
084 * <CODE>MessageNotWriteableException</CODE> is thrown.
085 *
086 * @openwire:marshaller code=24
087 * @see javax.jms.Session#createBytesMessage()
088 * @see javax.jms.MapMessage
089 * @see javax.jms.Message
090 * @see javax.jms.ObjectMessage
091 * @see javax.jms.StreamMessage
092 * @see javax.jms.TextMessage
093 */
094public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessage {
095
096    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BYTES_MESSAGE;
097
098    protected transient DataOutputStream dataOut;
099    protected transient ByteArrayOutputStream bytesOut;
100    protected transient DataInputStream dataIn;
101    protected transient int length;
102
103    @Override
104    public Message copy() {
105        ActiveMQBytesMessage copy = new ActiveMQBytesMessage();
106        copy(copy);
107        return copy;
108    }
109
110    private void copy(ActiveMQBytesMessage copy) {
111        storeContent();
112        super.copy(copy);
113        copy.dataOut = null;
114        copy.bytesOut = null;
115        copy.dataIn = null;
116    }
117
118    @Override
119    public void onSend() throws JMSException {
120        super.onSend();
121        storeContent();
122    }
123
124    @Override
125    public void storeContent() {
126        if (dataOut != null) {
127            try {
128                dataOut.close();
129                ByteSequence bs = bytesOut.toByteSequence();
130                setContent(bs);
131
132                ActiveMQConnection connection = getConnection();
133                if (connection != null && connection.isUseCompression()) {
134                    doCompress();
135                }
136            } catch (IOException ioe) {
137                throw new RuntimeException(ioe.getMessage(), ioe);
138            } finally {
139                try {
140                    if (bytesOut != null) {
141                        bytesOut.close();
142                        bytesOut = null;
143                    }
144                    if (dataOut != null) {
145                        dataOut.close();
146                        dataOut = null;
147                    }
148                } catch (IOException ioe) {
149                }
150            }
151        }
152    }
153
154    @Override
155    public byte getDataStructureType() {
156        return DATA_STRUCTURE_TYPE;
157    }
158
159    @Override
160    public String getJMSXMimeType() {
161        return "jms/bytes-message";
162    }
163
164    /**
165     * Clears out the message body. Clearing a message's body does not clear its
166     * header values or property entries.
167     * <P>
168     * If this message body was read-only, calling this method leaves the
169     * message body in the same state as an empty body in a newly created
170     * message.
171     *
172     * @throws JMSException if the JMS provider fails to clear the message body
173     *                 due to some internal error.
174     */
175    @Override
176    public void clearBody() throws JMSException {
177        super.clearBody();
178        this.dataOut = null;
179        this.dataIn = null;
180        this.bytesOut = null;
181    }
182
183    /**
184     * Gets the number of bytes of the message body when the message is in
185     * read-only mode. The value returned can be used to allocate a byte array.
186     * The value returned is the entire length of the message body, regardless
187     * of where the pointer for reading the message is currently located.
188     *
189     * @return number of bytes in the message
190     * @throws JMSException if the JMS provider fails to read the message due to
191     *                 some internal error.
192     * @throws MessageNotReadableException if the message is in write-only mode.
193     * @since 1.1
194     */
195
196    @Override
197    public long getBodyLength() throws JMSException {
198        initializeReading();
199        return length;
200    }
201
202    /**
203     * Reads a <code>boolean</code> from the bytes message stream.
204     *
205     * @return the <code>boolean</code> value read
206     * @throws JMSException if the JMS provider fails to read the message due to
207     *                 some internal error.
208     * @throws MessageEOFException if unexpected end of bytes stream has been
209     *                 reached.
210     * @throws MessageNotReadableException if the message is in write-only mode.
211     */
212    @Override
213    public boolean readBoolean() throws JMSException {
214        initializeReading();
215        try {
216            return this.dataIn.readBoolean();
217        } catch (EOFException e) {
218            throw JMSExceptionSupport.createMessageEOFException(e);
219        } catch (IOException e) {
220            throw JMSExceptionSupport.createMessageFormatException(e);
221        }
222    }
223
224    /**
225     * Reads a signed 8-bit value from the bytes message stream.
226     *
227     * @return the next byte from the bytes message stream as a signed 8-bit
228     *         <code>byte</code>
229     * @throws JMSException if the JMS provider fails to read the message due to
230     *                 some internal error.
231     * @throws MessageEOFException if unexpected end of bytes stream has been
232     *                 reached.
233     * @throws MessageNotReadableException if the message is in write-only mode.
234     */
235    @Override
236    public byte readByte() throws JMSException {
237        initializeReading();
238        try {
239            return this.dataIn.readByte();
240        } catch (EOFException e) {
241            throw JMSExceptionSupport.createMessageEOFException(e);
242        } catch (IOException e) {
243            throw JMSExceptionSupport.createMessageFormatException(e);
244        }
245    }
246
247    /**
248     * Reads an unsigned 8-bit number from the bytes message stream.
249     *
250     * @return the next byte from the bytes message stream, interpreted as an
251     *         unsigned 8-bit number
252     * @throws JMSException if the JMS provider fails to read the message due to
253     *                 some internal error.
254     * @throws MessageEOFException if unexpected end of bytes stream has been
255     *                 reached.
256     * @throws MessageNotReadableException if the message is in write-only mode.
257     */
258    @Override
259    public int readUnsignedByte() throws JMSException {
260        initializeReading();
261        try {
262            return this.dataIn.readUnsignedByte();
263        } catch (EOFException e) {
264            throw JMSExceptionSupport.createMessageEOFException(e);
265        } catch (IOException e) {
266            throw JMSExceptionSupport.createMessageFormatException(e);
267        }
268    }
269
270    /**
271     * Reads a signed 16-bit number from the bytes message stream.
272     *
273     * @return the next two bytes from the bytes message stream, interpreted as
274     *         a signed 16-bit number
275     * @throws JMSException if the JMS provider fails to read the message due to
276     *                 some internal error.
277     * @throws MessageEOFException if unexpected end of bytes stream has been
278     *                 reached.
279     * @throws MessageNotReadableException if the message is in write-only mode.
280     */
281    @Override
282    public short readShort() throws JMSException {
283        initializeReading();
284        try {
285            return this.dataIn.readShort();
286        } catch (EOFException e) {
287            throw JMSExceptionSupport.createMessageEOFException(e);
288        } catch (IOException e) {
289            throw JMSExceptionSupport.createMessageFormatException(e);
290        }
291    }
292
293    /**
294     * Reads an unsigned 16-bit number from the bytes message stream.
295     *
296     * @return the next two bytes from the bytes message stream, interpreted as
297     *         an unsigned 16-bit integer
298     * @throws JMSException if the JMS provider fails to read the message due to
299     *                 some internal error.
300     * @throws MessageEOFException if unexpected end of bytes stream has been
301     *                 reached.
302     * @throws MessageNotReadableException if the message is in write-only mode.
303     */
304    @Override
305    public int readUnsignedShort() throws JMSException {
306        initializeReading();
307        try {
308            return this.dataIn.readUnsignedShort();
309        } catch (EOFException e) {
310            throw JMSExceptionSupport.createMessageEOFException(e);
311        } catch (IOException e) {
312            throw JMSExceptionSupport.createMessageFormatException(e);
313        }
314    }
315
316    /**
317     * Reads a Unicode character value from the bytes message stream.
318     *
319     * @return the next two bytes from the bytes message stream as a Unicode
320     *         character
321     * @throws JMSException if the JMS provider fails to read the message due to
322     *                 some internal error.
323     * @throws MessageEOFException if unexpected end of bytes stream has been
324     *                 reached.
325     * @throws MessageNotReadableException if the message is in write-only mode.
326     */
327    @Override
328    public char readChar() throws JMSException {
329        initializeReading();
330        try {
331            return this.dataIn.readChar();
332        } catch (EOFException e) {
333            throw JMSExceptionSupport.createMessageEOFException(e);
334        } catch (IOException e) {
335            throw JMSExceptionSupport.createMessageFormatException(e);
336        }
337    }
338
339    /**
340     * Reads a signed 32-bit integer from the bytes message stream.
341     *
342     * @return the next four bytes from the bytes message stream, interpreted as
343     *         an <code>int</code>
344     * @throws JMSException if the JMS provider fails to read the message due to
345     *                 some internal error.
346     * @throws MessageEOFException if unexpected end of bytes stream has been
347     *                 reached.
348     * @throws MessageNotReadableException if the message is in write-only mode.
349     */
350    @Override
351    public int readInt() throws JMSException {
352        initializeReading();
353        try {
354            return this.dataIn.readInt();
355        } catch (EOFException e) {
356            throw JMSExceptionSupport.createMessageEOFException(e);
357        } catch (IOException e) {
358            throw JMSExceptionSupport.createMessageFormatException(e);
359        }
360    }
361
362    /**
363     * Reads a signed 64-bit integer from the bytes message stream.
364     *
365     * @return the next eight bytes from the bytes message stream, interpreted
366     *         as a <code>long</code>
367     * @throws JMSException if the JMS provider fails to read the message due to
368     *                 some internal error.
369     * @throws MessageEOFException if unexpected end of bytes stream has been
370     *                 reached.
371     * @throws MessageNotReadableException if the message is in write-only mode.
372     */
373    @Override
374    public long readLong() throws JMSException {
375        initializeReading();
376        try {
377            return this.dataIn.readLong();
378        } catch (EOFException e) {
379            throw JMSExceptionSupport.createMessageEOFException(e);
380        } catch (IOException e) {
381            throw JMSExceptionSupport.createMessageFormatException(e);
382        }
383    }
384
385    /**
386     * Reads a <code>float</code> from the bytes message stream.
387     *
388     * @return the next four bytes from the bytes message stream, interpreted as
389     *         a <code>float</code>
390     * @throws JMSException if the JMS provider fails to read the message due to
391     *                 some internal error.
392     * @throws MessageEOFException if unexpected end of bytes stream has been
393     *                 reached.
394     * @throws MessageNotReadableException if the message is in write-only mode.
395     */
396    @Override
397    public float readFloat() throws JMSException {
398        initializeReading();
399        try {
400            return this.dataIn.readFloat();
401        } catch (EOFException e) {
402            throw JMSExceptionSupport.createMessageEOFException(e);
403        } catch (IOException e) {
404            throw JMSExceptionSupport.createMessageFormatException(e);
405        }
406    }
407
408    /**
409     * Reads a <code>double</code> from the bytes message stream.
410     *
411     * @return the next eight bytes from the bytes message stream, interpreted
412     *         as a <code>double</code>
413     * @throws JMSException if the JMS provider fails to read the message due to
414     *                 some internal error.
415     * @throws MessageEOFException if unexpected end of bytes stream has been
416     *                 reached.
417     * @throws MessageNotReadableException if the message is in write-only mode.
418     */
419    @Override
420    public double readDouble() throws JMSException {
421        initializeReading();
422        try {
423            return this.dataIn.readDouble();
424        } catch (EOFException e) {
425            throw JMSExceptionSupport.createMessageEOFException(e);
426        } catch (IOException e) {
427            throw JMSExceptionSupport.createMessageFormatException(e);
428        }
429    }
430
431    /**
432     * Reads a string that has been encoded using a modified UTF-8 format from
433     * the bytes message stream.
434     * <P>
435     * For more information on the UTF-8 format, see "File System Safe UCS
436     * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
437     * X/Open Company Ltd., Document Number: P316. This information also appears
438     * in ISO/IEC 10646, Annex P.
439     *
440     * @return a Unicode string from the bytes message stream
441     * @throws JMSException if the JMS provider fails to read the message due to
442     *                 some internal error.
443     * @throws MessageEOFException if unexpected end of bytes stream has been
444     *                 reached.
445     * @throws MessageNotReadableException if the message is in write-only mode.
446     */
447    @Override
448    public String readUTF() throws JMSException {
449        initializeReading();
450        try {
451            return this.dataIn.readUTF();
452        } catch (EOFException e) {
453            throw JMSExceptionSupport.createMessageEOFException(e);
454        } catch (IOException e) {
455            throw JMSExceptionSupport.createMessageFormatException(e);
456        }
457    }
458
459    /**
460     * Reads a byte array from the bytes message stream.
461     * <P>
462     * If the length of array <code>value</code> is less than the number of
463     * bytes remaining to be read from the stream, the array should be filled. A
464     * subsequent call reads the next increment, and so on.
465     * <P>
466     * If the number of bytes remaining in the stream is less than the length of
467     * array <code>value</code>, the bytes should be read into the array. The
468     * return value of the total number of bytes read will be less than the
469     * length of the array, indicating that there are no more bytes left to be
470     * read from the stream. The next read of the stream returns -1.
471     *
472     * @param value the buffer into which the data is read
473     * @return the total number of bytes read into the buffer, or -1 if there is
474     *         no more data because the end of the stream has been reached
475     * @throws JMSException if the JMS provider fails to read the message due to
476     *                 some internal error.
477     * @throws MessageNotReadableException if the message is in write-only mode.
478     */
479    @Override
480    public int readBytes(byte[] value) throws JMSException {
481        return readBytes(value, value.length);
482    }
483
484    /**
485     * Reads a portion of the bytes message stream.
486     * <P>
487     * If the length of array <code>value</code> is less than the number of
488     * bytes remaining to be read from the stream, the array should be filled. A
489     * subsequent call reads the next increment, and so on.
490     * <P>
491     * If the number of bytes remaining in the stream is less than the length of
492     * array <code>value</code>, the bytes should be read into the array. The
493     * return value of the total number of bytes read will be less than the
494     * length of the array, indicating that there are no more bytes left to be
495     * read from the stream. The next read of the stream returns -1. <p/> If
496     * <code>length</code> is negative, or <code>length</code> is greater
497     * than the length of the array <code>value</code>, then an
498     * <code>IndexOutOfBoundsException</code> is thrown. No bytes will be read
499     * from the stream for this exception case.
500     *
501     * @param value the buffer into which the data is read
502     * @param length the number of bytes to read; must be less than or equal to
503     *                <code>value.length</code>
504     * @return the total number of bytes read into the buffer, or -1 if there is
505     *         no more data because the end of the stream has been reached
506     * @throws JMSException if the JMS provider fails to read the message due to
507     *                 some internal error.
508     * @throws MessageNotReadableException if the message is in write-only mode.
509     */
510    @Override
511    public int readBytes(byte[] value, int length) throws JMSException {
512        initializeReading();
513        try {
514            int n = 0;
515            while (n < length) {
516                int count = this.dataIn.read(value, n, length - n);
517                if (count < 0) {
518                    break;
519                }
520                n += count;
521            }
522            if (n == 0 && length > 0) {
523                n = -1;
524            }
525            return n;
526        } catch (EOFException e) {
527            throw JMSExceptionSupport.createMessageEOFException(e);
528        } catch (IOException e) {
529            throw JMSExceptionSupport.createMessageFormatException(e);
530        }
531    }
532
533    /**
534     * Writes a <code>boolean</code> to the bytes message stream as a 1-byte
535     * value. The value <code>true</code> is written as the value
536     * <code>(byte)1</code>; the value <code>false</code> is written as the
537     * value <code>(byte)0</code>.
538     *
539     * @param value the <code>boolean</code> value to be written
540     * @throws JMSException if the JMS provider fails to write the message due
541     *                 to some internal error.
542     * @throws MessageNotWriteableException if the message is in read-only mode.
543     */
544    @Override
545    public void writeBoolean(boolean value) throws JMSException {
546        initializeWriting();
547        try {
548            this.dataOut.writeBoolean(value);
549        } catch (IOException ioe) {
550            throw JMSExceptionSupport.create(ioe);
551        }
552    }
553
554    /**
555     * Writes a <code>byte</code> to the bytes message stream as a 1-byte
556     * value.
557     *
558     * @param value the <code>byte</code> value to be written
559     * @throws JMSException if the JMS provider fails to write the message due
560     *                 to some internal error.
561     * @throws MessageNotWriteableException if the message is in read-only mode.
562     */
563    @Override
564    public void writeByte(byte value) throws JMSException {
565        initializeWriting();
566        try {
567            this.dataOut.writeByte(value);
568        } catch (IOException ioe) {
569            throw JMSExceptionSupport.create(ioe);
570        }
571    }
572
573    /**
574     * Writes a <code>short</code> to the bytes message stream as two bytes,
575     * high byte first.
576     *
577     * @param value the <code>short</code> to be written
578     * @throws JMSException if the JMS provider fails to write the message due
579     *                 to some internal error.
580     * @throws MessageNotWriteableException if the message is in read-only mode.
581     */
582    @Override
583    public void writeShort(short value) throws JMSException {
584        initializeWriting();
585        try {
586            this.dataOut.writeShort(value);
587        } catch (IOException ioe) {
588            throw JMSExceptionSupport.create(ioe);
589        }
590    }
591
592    /**
593     * Writes a <code>char</code> to the bytes message stream as a 2-byte
594     * value, high byte first.
595     *
596     * @param value the <code>char</code> value to be written
597     * @throws JMSException if the JMS provider fails to write the message due
598     *                 to some internal error.
599     * @throws MessageNotWriteableException if the message is in read-only mode.
600     */
601    @Override
602    public void writeChar(char value) throws JMSException {
603        initializeWriting();
604        try {
605            this.dataOut.writeChar(value);
606        } catch (IOException ioe) {
607            throw JMSExceptionSupport.create(ioe);
608        }
609    }
610
611    /**
612     * Writes an <code>int</code> to the bytes message stream as four bytes,
613     * high byte first.
614     *
615     * @param value the <code>int</code> to be written
616     * @throws JMSException if the JMS provider fails to write the message due
617     *                 to some internal error.
618     * @throws MessageNotWriteableException if the message is in read-only mode.
619     */
620    @Override
621    public void writeInt(int value) throws JMSException {
622        initializeWriting();
623        try {
624            this.dataOut.writeInt(value);
625        } catch (IOException ioe) {
626            throw JMSExceptionSupport.create(ioe);
627        }
628    }
629
630    /**
631     * Writes a <code>long</code> to the bytes message stream as eight bytes,
632     * high byte first.
633     *
634     * @param value the <code>long</code> to be written
635     * @throws JMSException if the JMS provider fails to write the message due
636     *                 to some internal error.
637     * @throws MessageNotWriteableException if the message is in read-only mode.
638     */
639    @Override
640    public void writeLong(long value) throws JMSException {
641        initializeWriting();
642        try {
643            this.dataOut.writeLong(value);
644        } catch (IOException ioe) {
645            throw JMSExceptionSupport.create(ioe);
646        }
647    }
648
649    /**
650     * Converts the <code>float</code> argument to an <code>int</code> using
651     * the <code>floatToIntBits</code> method in class <code>Float</code>,
652     * and then writes that <code>int</code> value to the bytes message stream
653     * as a 4-byte quantity, high byte first.
654     *
655     * @param value the <code>float</code> value to be written
656     * @throws JMSException if the JMS provider fails to write the message due
657     *                 to some internal error.
658     * @throws MessageNotWriteableException if the message is in read-only mode.
659     */
660    @Override
661    public void writeFloat(float value) throws JMSException {
662        initializeWriting();
663        try {
664            this.dataOut.writeFloat(value);
665        } catch (IOException ioe) {
666            throw JMSExceptionSupport.create(ioe);
667        }
668    }
669
670    /**
671     * Converts the <code>double</code> argument to a <code>long</code>
672     * using the <code>doubleToLongBits</code> method in class
673     * <code>Double</code>, and then writes that <code>long</code> value to
674     * the bytes message stream as an 8-byte quantity, high byte first.
675     *
676     * @param value the <code>double</code> value to be written
677     * @throws JMSException if the JMS provider fails to write the message due
678     *                 to some internal error.
679     * @throws MessageNotWriteableException if the message is in read-only mode.
680     */
681    @Override
682    public void writeDouble(double value) throws JMSException {
683        initializeWriting();
684        try {
685            this.dataOut.writeDouble(value);
686        } catch (IOException ioe) {
687            throw JMSExceptionSupport.create(ioe);
688        }
689    }
690
691    /**
692     * Writes a string to the bytes message stream using UTF-8 encoding in a
693     * machine-independent manner.
694     * <P>
695     * For more information on the UTF-8 format, see "File System Safe UCS
696     * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
697     * X/Open Company Ltd., Document Number: P316. This information also appears
698     * in ISO/IEC 10646, Annex P.
699     *
700     * @param value the <code>String</code> value to be written
701     * @throws JMSException if the JMS provider fails to write the message due
702     *                 to some internal error.
703     * @throws MessageNotWriteableException if the message is in read-only mode.
704     */
705    @Override
706    public void writeUTF(String value) throws JMSException {
707        initializeWriting();
708        try {
709            this.dataOut.writeUTF(value);
710        } catch (IOException ioe) {
711            throw JMSExceptionSupport.create(ioe);
712        }
713    }
714
715    /**
716     * Writes a byte array to the bytes message stream.
717     *
718     * @param value the byte array to be written
719     * @throws JMSException if the JMS provider fails to write the message due
720     *                 to some internal error.
721     * @throws MessageNotWriteableException if the message is in read-only mode.
722     */
723    @Override
724    public void writeBytes(byte[] value) throws JMSException {
725        initializeWriting();
726        try {
727            this.dataOut.write(value);
728        } catch (IOException ioe) {
729            throw JMSExceptionSupport.create(ioe);
730        }
731    }
732
733    /**
734     * Writes a portion of a byte array to the bytes message stream.
735     *
736     * @param value the byte array value to be written
737     * @param offset the initial offset within the byte array
738     * @param length the number of bytes to use
739     * @throws JMSException if the JMS provider fails to write the message due
740     *                 to some internal error.
741     * @throws MessageNotWriteableException if the message is in read-only mode.
742     */
743    @Override
744    public void writeBytes(byte[] value, int offset, int length) throws JMSException {
745        initializeWriting();
746        try {
747            this.dataOut.write(value, offset, length);
748        } catch (IOException ioe) {
749            throw JMSExceptionSupport.create(ioe);
750        }
751    }
752
753    /**
754     * Writes an object to the bytes message stream.
755     * <P>
756     * This method works only for the objectified primitive object types (<code>Integer</code>,<code>Double</code>,
757     * <code>Long</code> &nbsp;...), <code>String</code> objects, and byte
758     * arrays.
759     *
760     * @param value the object in the Java programming language ("Java object")
761     *                to be written; it must not be null
762     * @throws JMSException if the JMS provider fails to write the message due
763     *                 to some internal error.
764     * @throws MessageFormatException if the object is of an invalid type.
765     * @throws MessageNotWriteableException if the message is in read-only mode.
766     * @throws java.lang.NullPointerException if the parameter
767     *                 <code>value</code> is null.
768     */
769    @Override
770    public void writeObject(Object value) throws JMSException {
771        if (value == null) {
772            throw new NullPointerException();
773        }
774        initializeWriting();
775        if (value instanceof Boolean) {
776            writeBoolean(((Boolean)value).booleanValue());
777        } else if (value instanceof Character) {
778            writeChar(((Character)value).charValue());
779        } else if (value instanceof Byte) {
780            writeByte(((Byte)value).byteValue());
781        } else if (value instanceof Short) {
782            writeShort(((Short)value).shortValue());
783        } else if (value instanceof Integer) {
784            writeInt(((Integer)value).intValue());
785        } else if (value instanceof Long) {
786            writeLong(((Long)value).longValue());
787        } else if (value instanceof Float) {
788            writeFloat(((Float)value).floatValue());
789        } else if (value instanceof Double) {
790            writeDouble(((Double)value).doubleValue());
791        } else if (value instanceof String) {
792            writeUTF(value.toString());
793        } else if (value instanceof byte[]) {
794            writeBytes((byte[])value);
795        } else {
796            throw new MessageFormatException("Cannot write non-primitive type:" + value.getClass());
797        }
798    }
799
800    /**
801     * Puts the message body in read-only mode and repositions the stream of
802     * bytes to the beginning.
803     *
804     * @throws JMSException if an internal error occurs
805     */
806    @Override
807    public void reset() throws JMSException {
808        storeContent();
809        setReadOnlyBody(true);
810        try {
811            if (bytesOut != null) {
812                bytesOut.close();
813                bytesOut = null;
814            }
815            if (dataIn != null) {
816                dataIn.close();
817                dataIn = null;
818            }
819            if (dataOut != null) {
820                dataOut.close();
821                dataOut = null;
822            }
823        } catch (IOException ioe) {
824            throw JMSExceptionSupport.create(ioe);
825        }
826    }
827
828    private void initializeWriting() throws JMSException {
829        checkReadOnlyBody();
830        if (this.dataOut == null) {
831            this.bytesOut = new ByteArrayOutputStream();
832            OutputStream os = bytesOut;
833            this.dataOut = new DataOutputStream(os);
834        }
835
836        restoreOldContent();
837    }
838
839    private void restoreOldContent() throws JMSException {
840        // For a message that already had a body and was sent we need to restore the content
841        // if the message is used again without having its clearBody method called.
842        if (this.content != null && this.content.length > 0) {
843            try {
844                ByteSequence toRestore = this.content;
845                if (isCompressed()) {
846                    toRestore = new ByteSequence(decompress(this.content));
847                    compressed = false;
848                }
849
850                this.dataOut.write(toRestore.getData(), toRestore.getOffset(), toRestore.getLength());
851                // Free up the buffer from the old content, will be re-written when
852                // the message is sent again and storeContent() is called.
853                this.content = null;
854            } catch (IOException ioe) {
855                throw JMSExceptionSupport.create(ioe);
856            }
857        }
858    }
859
860    protected void checkWriteOnlyBody() throws MessageNotReadableException {
861        if (!readOnlyBody) {
862            throw new MessageNotReadableException("Message body is write-only");
863        }
864    }
865
866    private void initializeReading() throws JMSException {
867        checkWriteOnlyBody();
868        if (dataIn == null) {
869            try {
870                ByteSequence data = getContent();
871                if (data == null) {
872                    data = new ByteSequence(new byte[] {}, 0, 0);
873                }
874                InputStream is = new ByteArrayInputStream(data);
875                if (isCompressed()) {
876                    if (data.length != 0) {
877                        is = new ByteArrayInputStream(decompress(data));
878                    }
879                } else {
880                    length = data.getLength();
881                }
882
883                dataIn = new DataInputStream(is);
884            } catch (IOException ioe) {
885                throw JMSExceptionSupport.create(ioe);
886            }
887        }
888    }
889
890    protected byte[] decompress(ByteSequence dataSequence) throws IOException {
891        Inflater inflater = new Inflater();
892        ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
893        try {
894            length = ByteSequenceData.readIntBig(dataSequence);
895            dataSequence.offset = 0;
896            byte[] data = Arrays.copyOfRange(dataSequence.getData(), 4, dataSequence.getLength());
897            inflater.setInput(data);
898            byte[] buffer = new byte[length];
899            int count = inflater.inflate(buffer);
900            decompressed.write(buffer, 0, count);
901            return decompressed.toByteArray();
902        } catch (Exception e) {
903            throw new IOException(e);
904        } finally {
905            inflater.end();
906            decompressed.close();
907        }
908    }
909
910    @Override
911    public void setObjectProperty(String name, Object value) throws JMSException {
912        initializeWriting();
913        super.setObjectProperty(name, value);
914    }
915
916    @Override
917    public String toString() {
918        return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
919    }
920
921    @Override
922    protected void doCompress() throws IOException {
923        compressed = true;
924        ByteSequence bytes = getContent();
925        if (bytes != null) {
926            int length = bytes.getLength();
927            ByteArrayOutputStream compressed = new ByteArrayOutputStream();
928            compressed.write(new byte[4]);
929            Deflater deflater = new Deflater();
930            try {
931                deflater.setInput(bytes.data);
932                deflater.finish();
933                byte[] buffer = new byte[1024];
934                while (!deflater.finished()) {
935                    int count = deflater.deflate(buffer);
936                    compressed.write(buffer, 0, count);
937                }
938
939                bytes = compressed.toByteSequence();
940                ByteSequenceData.writeIntBig(bytes, length);
941                bytes.offset = 0;
942                setContent(bytes);
943            } finally {
944                deflater.end();
945                compressed.close();
946            }
947        }
948    }
949}