001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.command;
018
019 import java.io.DataInputStream;
020 import java.io.DataOutputStream;
021 import java.io.EOFException;
022 import java.io.FilterOutputStream;
023 import java.io.IOException;
024 import java.io.InputStream;
025 import java.io.OutputStream;
026 import java.util.zip.Deflater;
027 import java.util.zip.DeflaterOutputStream;
028 import java.util.zip.InflaterInputStream;
029
030 import javax.jms.BytesMessage;
031 import javax.jms.JMSException;
032 import javax.jms.MessageFormatException;
033 import javax.jms.MessageNotReadableException;
034
035 import org.apache.activemq.ActiveMQConnection;
036 import org.apache.activemq.util.ByteArrayInputStream;
037 import org.apache.activemq.util.ByteArrayOutputStream;
038 import org.apache.activemq.util.ByteSequence;
039 import org.apache.activemq.util.ByteSequenceData;
040 import org.apache.activemq.util.JMSExceptionSupport;
041
042 /**
043 * A <CODE>BytesMessage</CODE> object is used to send a message containing a
044 * stream of uninterpreted bytes. It inherits from the <CODE>Message</CODE>
045 * interface and adds a bytes message body. The receiver of the message supplies
046 * the interpretation of the bytes.
047 * <P>
048 * The <CODE>BytesMessage</CODE> methods are based largely on those found in
049 * <CODE>java.io.DataInputStream</CODE> and
050 * <CODE>java.io.DataOutputStream</CODE>.
051 * <P>
052 * This message type is for client encoding of existing message formats. If
053 * possible, one of the other self-defining message types should be used
054 * instead.
055 * <P>
056 * Although the JMS API allows the use of message properties with byte messages,
057 * they are typically not used, since the inclusion of properties may affect the
058 * format.
059 * <P>
060 * The primitive types can be written explicitly using methods for each type.
061 * They may also be written generically as objects. For instance, a call to
062 * <CODE>BytesMessage.writeInt(6)</CODE> is equivalent to
063 * <CODE> BytesMessage.writeObject(new Integer(6))</CODE>. Both forms are
064 * provided, because the explicit form is convenient for static programming, and
065 * the object form is needed when types are not known at compile time.
066 * <P>
067 * When the message is first created, and when <CODE>clearBody</CODE> is
068 * called, the body of the message is in write-only mode. After the first call
069 * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
070 * After a message has been sent, the client that sent it can retain and modify
071 * it without affecting the message that has been sent. The same message object
072 * can be sent multiple times. When a message has been received, the provider
073 * has called <CODE>reset</CODE> so that the message body is in read-only mode
074 * for the client.
075 * <P>
076 * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
077 * message body is cleared and the message is in write-only mode.
078 * <P>
079 * If a client attempts to read a message in write-only mode, a
080 * <CODE>MessageNotReadableException</CODE> is thrown.
081 * <P>
082 * If a client attempts to write a message in read-only mode, a
083 * <CODE>MessageNotWriteableException</CODE> is thrown.
084 *
085 * @openwire:marshaller code=24
086 * @see javax.jms.Session#createBytesMessage()
087 * @see javax.jms.MapMessage
088 * @see javax.jms.Message
089 * @see javax.jms.ObjectMessage
090 * @see javax.jms.StreamMessage
091 * @see javax.jms.TextMessage
092 */
093 public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessage {
094
095 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BYTES_MESSAGE;
096
097 protected transient DataOutputStream dataOut;
098 protected transient ByteArrayOutputStream bytesOut;
099 protected transient DataInputStream dataIn;
100 protected transient int length;
101
102 public Message copy() {
103 ActiveMQBytesMessage copy = new ActiveMQBytesMessage();
104 copy(copy);
105 return copy;
106 }
107
108 private void copy(ActiveMQBytesMessage copy) {
109 storeContent();
110 super.copy(copy);
111 copy.dataOut = null;
112 copy.bytesOut = null;
113 copy.dataIn = null;
114 }
115
116 public void onSend() throws JMSException {
117 super.onSend();
118 storeContent();
119 }
120
121 @Override
122 public void storeContent() {
123 try {
124 if (dataOut != null) {
125 dataOut.close();
126 ByteSequence bs = bytesOut.toByteSequence();
127 if (compressed) {
128 int pos = bs.offset;
129 ByteSequenceData.writeIntBig(bs, length);
130 bs.offset = pos;
131 }
132 setContent(bs);
133 bytesOut = null;
134 dataOut = null;
135 }
136 } catch (IOException ioe) {
137 throw new RuntimeException(ioe.getMessage(), ioe); // TODO verify
138 // RuntimeException
139 }
140 }
141
142 public byte getDataStructureType() {
143 return DATA_STRUCTURE_TYPE;
144 }
145
146 public String getJMSXMimeType() {
147 return "jms/bytes-message";
148 }
149
150 /**
151 * Clears out the message body. Clearing a message's body does not clear its
152 * header values or property entries.
153 * <P>
154 * If this message body was read-only, calling this method leaves the
155 * message body in the same state as an empty body in a newly created
156 * message.
157 *
158 * @throws JMSException if the JMS provider fails to clear the message body
159 * due to some internal error.
160 */
161 public void clearBody() throws JMSException {
162 super.clearBody();
163 this.dataOut = null;
164 this.dataIn = null;
165 this.bytesOut = null;
166 }
167
168 /**
169 * Gets the number of bytes of the message body when the message is in
170 * read-only mode. The value returned can be used to allocate a byte array.
171 * The value returned is the entire length of the message body, regardless
172 * of where the pointer for reading the message is currently located.
173 *
174 * @return number of bytes in the message
175 * @throws JMSException if the JMS provider fails to read the message due to
176 * some internal error.
177 * @throws MessageNotReadableException if the message is in write-only mode.
178 * @since 1.1
179 */
180
181 public long getBodyLength() throws JMSException {
182 initializeReading();
183 return length;
184 }
185
186 /**
187 * Reads a <code>boolean</code> from the bytes message stream.
188 *
189 * @return the <code>boolean</code> value read
190 * @throws JMSException if the JMS provider fails to read the message due to
191 * some internal error.
192 * @throws MessageEOFException if unexpected end of bytes stream has been
193 * reached.
194 * @throws MessageNotReadableException if the message is in write-only mode.
195 */
196 public boolean readBoolean() throws JMSException {
197 initializeReading();
198 try {
199 return this.dataIn.readBoolean();
200 } catch (EOFException e) {
201 throw JMSExceptionSupport.createMessageEOFException(e);
202 } catch (IOException e) {
203 throw JMSExceptionSupport.createMessageFormatException(e);
204 }
205 }
206
207 /**
208 * Reads a signed 8-bit value from the bytes message stream.
209 *
210 * @return the next byte from the bytes message stream as a signed 8-bit
211 * <code>byte</code>
212 * @throws JMSException if the JMS provider fails to read the message due to
213 * some internal error.
214 * @throws MessageEOFException if unexpected end of bytes stream has been
215 * reached.
216 * @throws MessageNotReadableException if the message is in write-only mode.
217 */
218 public byte readByte() throws JMSException {
219 initializeReading();
220 try {
221 return this.dataIn.readByte();
222 } catch (EOFException e) {
223 throw JMSExceptionSupport.createMessageEOFException(e);
224 } catch (IOException e) {
225 throw JMSExceptionSupport.createMessageFormatException(e);
226 }
227 }
228
229 /**
230 * Reads an unsigned 8-bit number from the bytes message stream.
231 *
232 * @return the next byte from the bytes message stream, interpreted as an
233 * unsigned 8-bit number
234 * @throws JMSException if the JMS provider fails to read the message due to
235 * some internal error.
236 * @throws MessageEOFException if unexpected end of bytes stream has been
237 * reached.
238 * @throws MessageNotReadableException if the message is in write-only mode.
239 */
240 public int readUnsignedByte() throws JMSException {
241 initializeReading();
242 try {
243 return this.dataIn.readUnsignedByte();
244 } catch (EOFException e) {
245 throw JMSExceptionSupport.createMessageEOFException(e);
246 } catch (IOException e) {
247 throw JMSExceptionSupport.createMessageFormatException(e);
248 }
249 }
250
251 /**
252 * Reads a signed 16-bit number from the bytes message stream.
253 *
254 * @return the next two bytes from the bytes message stream, interpreted as
255 * a signed 16-bit number
256 * @throws JMSException if the JMS provider fails to read the message due to
257 * some internal error.
258 * @throws MessageEOFException if unexpected end of bytes stream has been
259 * reached.
260 * @throws MessageNotReadableException if the message is in write-only mode.
261 */
262 public short readShort() throws JMSException {
263 initializeReading();
264 try {
265 return this.dataIn.readShort();
266 } catch (EOFException e) {
267 throw JMSExceptionSupport.createMessageEOFException(e);
268 } catch (IOException e) {
269 throw JMSExceptionSupport.createMessageFormatException(e);
270 }
271 }
272
273 /**
274 * Reads an unsigned 16-bit number from the bytes message stream.
275 *
276 * @return the next two bytes from the bytes message stream, interpreted as
277 * an unsigned 16-bit integer
278 * @throws JMSException if the JMS provider fails to read the message due to
279 * some internal error.
280 * @throws MessageEOFException if unexpected end of bytes stream has been
281 * reached.
282 * @throws MessageNotReadableException if the message is in write-only mode.
283 */
284 public int readUnsignedShort() throws JMSException {
285 initializeReading();
286 try {
287 return this.dataIn.readUnsignedShort();
288 } catch (EOFException e) {
289 throw JMSExceptionSupport.createMessageEOFException(e);
290 } catch (IOException e) {
291 throw JMSExceptionSupport.createMessageFormatException(e);
292 }
293 }
294
295 /**
296 * Reads a Unicode character value from the bytes message stream.
297 *
298 * @return the next two bytes from the bytes message stream as a Unicode
299 * character
300 * @throws JMSException if the JMS provider fails to read the message due to
301 * some internal error.
302 * @throws MessageEOFException if unexpected end of bytes stream has been
303 * reached.
304 * @throws MessageNotReadableException if the message is in write-only mode.
305 */
306 public char readChar() throws JMSException {
307 initializeReading();
308 try {
309 return this.dataIn.readChar();
310 } catch (EOFException e) {
311 throw JMSExceptionSupport.createMessageEOFException(e);
312 } catch (IOException e) {
313 throw JMSExceptionSupport.createMessageFormatException(e);
314 }
315 }
316
317 /**
318 * Reads a signed 32-bit integer from the bytes message stream.
319 *
320 * @return the next four bytes from the bytes message stream, interpreted as
321 * an <code>int</code>
322 * @throws JMSException if the JMS provider fails to read the message due to
323 * some internal error.
324 * @throws MessageEOFException if unexpected end of bytes stream has been
325 * reached.
326 * @throws MessageNotReadableException if the message is in write-only mode.
327 */
328 public int readInt() throws JMSException {
329 initializeReading();
330 try {
331 return this.dataIn.readInt();
332 } catch (EOFException e) {
333 throw JMSExceptionSupport.createMessageEOFException(e);
334 } catch (IOException e) {
335 throw JMSExceptionSupport.createMessageFormatException(e);
336 }
337 }
338
339 /**
340 * Reads a signed 64-bit integer from the bytes message stream.
341 *
342 * @return the next eight bytes from the bytes message stream, interpreted
343 * as a <code>long</code>
344 * @throws JMSException if the JMS provider fails to read the message due to
345 * some internal error.
346 * @throws MessageEOFException if unexpected end of bytes stream has been
347 * reached.
348 * @throws MessageNotReadableException if the message is in write-only mode.
349 */
350 public long readLong() throws JMSException {
351 initializeReading();
352 try {
353 return this.dataIn.readLong();
354 } catch (EOFException e) {
355 throw JMSExceptionSupport.createMessageEOFException(e);
356 } catch (IOException e) {
357 throw JMSExceptionSupport.createMessageFormatException(e);
358 }
359 }
360
361 /**
362 * Reads a <code>float</code> from the bytes message stream.
363 *
364 * @return the next four bytes from the bytes message stream, interpreted as
365 * a <code>float</code>
366 * @throws JMSException if the JMS provider fails to read the message due to
367 * some internal error.
368 * @throws MessageEOFException if unexpected end of bytes stream has been
369 * reached.
370 * @throws MessageNotReadableException if the message is in write-only mode.
371 */
372 public float readFloat() throws JMSException {
373 initializeReading();
374 try {
375 return this.dataIn.readFloat();
376 } catch (EOFException e) {
377 throw JMSExceptionSupport.createMessageEOFException(e);
378 } catch (IOException e) {
379 throw JMSExceptionSupport.createMessageFormatException(e);
380 }
381 }
382
383 /**
384 * Reads a <code>double</code> from the bytes message stream.
385 *
386 * @return the next eight bytes from the bytes message stream, interpreted
387 * as a <code>double</code>
388 * @throws JMSException if the JMS provider fails to read the message due to
389 * some internal error.
390 * @throws MessageEOFException if unexpected end of bytes stream has been
391 * reached.
392 * @throws MessageNotReadableException if the message is in write-only mode.
393 */
394 public double readDouble() throws JMSException {
395 initializeReading();
396 try {
397 return this.dataIn.readDouble();
398 } catch (EOFException e) {
399 throw JMSExceptionSupport.createMessageEOFException(e);
400 } catch (IOException e) {
401 throw JMSExceptionSupport.createMessageFormatException(e);
402 }
403 }
404
405 /**
406 * Reads a string that has been encoded using a modified UTF-8 format from
407 * the bytes message stream.
408 * <P>
409 * For more information on the UTF-8 format, see "File System Safe UCS
410 * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
411 * X/Open Company Ltd., Document Number: P316. This information also appears
412 * in ISO/IEC 10646, Annex P.
413 *
414 * @return a Unicode string from the bytes message stream
415 * @throws JMSException if the JMS provider fails to read the message due to
416 * some internal error.
417 * @throws MessageEOFException if unexpected end of bytes stream has been
418 * reached.
419 * @throws MessageNotReadableException if the message is in write-only mode.
420 */
421 public String readUTF() throws JMSException {
422 initializeReading();
423 try {
424 return this.dataIn.readUTF();
425 } catch (EOFException e) {
426 throw JMSExceptionSupport.createMessageEOFException(e);
427 } catch (IOException e) {
428 throw JMSExceptionSupport.createMessageFormatException(e);
429 }
430 }
431
432 /**
433 * Reads a byte array from the bytes message stream.
434 * <P>
435 * If the length of array <code>value</code> is less than the number of
436 * bytes remaining to be read from the stream, the array should be filled. A
437 * subsequent call reads the next increment, and so on.
438 * <P>
439 * If the number of bytes remaining in the stream is less than the length of
440 * array <code>value</code>, the bytes should be read into the array. The
441 * return value of the total number of bytes read will be less than the
442 * length of the array, indicating that there are no more bytes left to be
443 * read from the stream. The next read of the stream returns -1.
444 *
445 * @param value the buffer into which the data is read
446 * @return the total number of bytes read into the buffer, or -1 if there is
447 * no more data because the end of the stream has been reached
448 * @throws JMSException if the JMS provider fails to read the message due to
449 * some internal error.
450 * @throws MessageNotReadableException if the message is in write-only mode.
451 */
452 public int readBytes(byte[] value) throws JMSException {
453 return readBytes(value, value.length);
454 }
455
456 /**
457 * Reads a portion of the bytes message stream.
458 * <P>
459 * If the length of array <code>value</code> is less than the number of
460 * bytes remaining to be read from the stream, the array should be filled. A
461 * subsequent call reads the next increment, and so on.
462 * <P>
463 * If the number of bytes remaining in the stream is less than the length of
464 * array <code>value</code>, the bytes should be read into the array. The
465 * return value of the total number of bytes read will be less than the
466 * length of the array, indicating that there are no more bytes left to be
467 * read from the stream. The next read of the stream returns -1. <p/> If
468 * <code>length</code> is negative, or <code>length</code> is greater
469 * than the length of the array <code>value</code>, then an
470 * <code>IndexOutOfBoundsException</code> is thrown. No bytes will be read
471 * from the stream for this exception case.
472 *
473 * @param value the buffer into which the data is read
474 * @param length the number of bytes to read; must be less than or equal to
475 * <code>value.length</code>
476 * @return the total number of bytes read into the buffer, or -1 if there is
477 * no more data because the end of the stream has been reached
478 * @throws JMSException if the JMS provider fails to read the message due to
479 * some internal error.
480 * @throws MessageNotReadableException if the message is in write-only mode.
481 */
482 public int readBytes(byte[] value, int length) throws JMSException {
483 initializeReading();
484 try {
485 int n = 0;
486 while (n < length) {
487 int count = this.dataIn.read(value, n, length - n);
488 if (count < 0) {
489 break;
490 }
491 n += count;
492 }
493 if (n == 0 && length > 0) {
494 n = -1;
495 }
496 return n;
497 } catch (EOFException e) {
498 throw JMSExceptionSupport.createMessageEOFException(e);
499 } catch (IOException e) {
500 throw JMSExceptionSupport.createMessageFormatException(e);
501 }
502 }
503
504 /**
505 * Writes a <code>boolean</code> to the bytes message stream as a 1-byte
506 * value. The value <code>true</code> is written as the value
507 * <code>(byte)1</code>; the value <code>false</code> is written as the
508 * value <code>(byte)0</code>.
509 *
510 * @param value the <code>boolean</code> value to be written
511 * @throws JMSException if the JMS provider fails to write the message due
512 * to some internal error.
513 * @throws MessageNotWriteableException if the message is in read-only mode.
514 */
515 public void writeBoolean(boolean value) throws JMSException {
516 initializeWriting();
517 try {
518 this.dataOut.writeBoolean(value);
519 } catch (IOException ioe) {
520 throw JMSExceptionSupport.create(ioe);
521 }
522 }
523
524 /**
525 * Writes a <code>byte</code> to the bytes message stream as a 1-byte
526 * value.
527 *
528 * @param value the <code>byte</code> value to be written
529 * @throws JMSException if the JMS provider fails to write the message due
530 * to some internal error.
531 * @throws MessageNotWriteableException if the message is in read-only mode.
532 */
533 public void writeByte(byte value) throws JMSException {
534 initializeWriting();
535 try {
536 this.dataOut.writeByte(value);
537 } catch (IOException ioe) {
538 throw JMSExceptionSupport.create(ioe);
539 }
540 }
541
542 /**
543 * Writes a <code>short</code> to the bytes message stream as two bytes,
544 * high byte first.
545 *
546 * @param value the <code>short</code> to be written
547 * @throws JMSException if the JMS provider fails to write the message due
548 * to some internal error.
549 * @throws MessageNotWriteableException if the message is in read-only mode.
550 */
551 public void writeShort(short value) throws JMSException {
552 initializeWriting();
553 try {
554 this.dataOut.writeShort(value);
555 } catch (IOException ioe) {
556 throw JMSExceptionSupport.create(ioe);
557 }
558 }
559
560 /**
561 * Writes a <code>char</code> to the bytes message stream as a 2-byte
562 * value, high byte first.
563 *
564 * @param value the <code>char</code> value to be written
565 * @throws JMSException if the JMS provider fails to write the message due
566 * to some internal error.
567 * @throws MessageNotWriteableException if the message is in read-only mode.
568 */
569 public void writeChar(char value) throws JMSException {
570 initializeWriting();
571 try {
572 this.dataOut.writeChar(value);
573 } catch (IOException ioe) {
574 throw JMSExceptionSupport.create(ioe);
575 }
576 }
577
578 /**
579 * Writes an <code>int</code> to the bytes message stream as four bytes,
580 * high byte first.
581 *
582 * @param value the <code>int</code> to be written
583 * @throws JMSException if the JMS provider fails to write the message due
584 * to some internal error.
585 * @throws MessageNotWriteableException if the message is in read-only mode.
586 */
587 public void writeInt(int value) throws JMSException {
588 initializeWriting();
589 try {
590 this.dataOut.writeInt(value);
591 } catch (IOException ioe) {
592 throw JMSExceptionSupport.create(ioe);
593 }
594 }
595
596 /**
597 * Writes a <code>long</code> to the bytes message stream as eight bytes,
598 * high byte first.
599 *
600 * @param value the <code>long</code> to be written
601 * @throws JMSException if the JMS provider fails to write the message due
602 * to some internal error.
603 * @throws MessageNotWriteableException if the message is in read-only mode.
604 */
605 public void writeLong(long value) throws JMSException {
606 initializeWriting();
607 try {
608 this.dataOut.writeLong(value);
609 } catch (IOException ioe) {
610 throw JMSExceptionSupport.create(ioe);
611 }
612 }
613
614 /**
615 * Converts the <code>float</code> argument to an <code>int</code> using
616 * the <code>floatToIntBits</code> method in class <code>Float</code>,
617 * and then writes that <code>int</code> value to the bytes message stream
618 * as a 4-byte quantity, high byte first.
619 *
620 * @param value the <code>float</code> value to be written
621 * @throws JMSException if the JMS provider fails to write the message due
622 * to some internal error.
623 * @throws MessageNotWriteableException if the message is in read-only mode.
624 */
625 public void writeFloat(float value) throws JMSException {
626 initializeWriting();
627 try {
628 this.dataOut.writeFloat(value);
629 } catch (IOException ioe) {
630 throw JMSExceptionSupport.create(ioe);
631 }
632 }
633
634 /**
635 * Converts the <code>double</code> argument to a <code>long</code>
636 * using the <code>doubleToLongBits</code> method in class
637 * <code>Double</code>, and then writes that <code>long</code> value to
638 * the bytes message stream as an 8-byte quantity, high byte first.
639 *
640 * @param value the <code>double</code> value to be written
641 * @throws JMSException if the JMS provider fails to write the message due
642 * to some internal error.
643 * @throws MessageNotWriteableException if the message is in read-only mode.
644 */
645 public void writeDouble(double value) throws JMSException {
646 initializeWriting();
647 try {
648 this.dataOut.writeDouble(value);
649 } catch (IOException ioe) {
650 throw JMSExceptionSupport.create(ioe);
651 }
652 }
653
654 /**
655 * Writes a string to the bytes message stream using UTF-8 encoding in a
656 * machine-independent manner.
657 * <P>
658 * For more information on the UTF-8 format, see "File System Safe UCS
659 * Transformation Format (FSS_UTF)", X/Open Preliminary Specification,
660 * X/Open Company Ltd., Document Number: P316. This information also appears
661 * in ISO/IEC 10646, Annex P.
662 *
663 * @param value the <code>String</code> value to be written
664 * @throws JMSException if the JMS provider fails to write the message due
665 * to some internal error.
666 * @throws MessageNotWriteableException if the message is in read-only mode.
667 */
668 public void writeUTF(String value) throws JMSException {
669 initializeWriting();
670 try {
671 this.dataOut.writeUTF(value);
672 } catch (IOException ioe) {
673 throw JMSExceptionSupport.create(ioe);
674 }
675 }
676
677 /**
678 * Writes a byte array to the bytes message stream.
679 *
680 * @param value the byte array to be written
681 * @throws JMSException if the JMS provider fails to write the message due
682 * to some internal error.
683 * @throws MessageNotWriteableException if the message is in read-only mode.
684 */
685 public void writeBytes(byte[] value) throws JMSException {
686 initializeWriting();
687 try {
688 this.dataOut.write(value);
689 } catch (IOException ioe) {
690 throw JMSExceptionSupport.create(ioe);
691 }
692 }
693
694 /**
695 * Writes a portion of a byte array to the bytes message stream.
696 *
697 * @param value the byte array value to be written
698 * @param offset the initial offset within the byte array
699 * @param length the number of bytes to use
700 * @throws JMSException if the JMS provider fails to write the message due
701 * to some internal error.
702 * @throws MessageNotWriteableException if the message is in read-only mode.
703 */
704 public void writeBytes(byte[] value, int offset, int length) throws JMSException {
705 initializeWriting();
706 try {
707 this.dataOut.write(value, offset, length);
708 } catch (IOException ioe) {
709 throw JMSExceptionSupport.create(ioe);
710 }
711 }
712
713 /**
714 * Writes an object to the bytes message stream.
715 * <P>
716 * This method works only for the objectified primitive object types (<code>Integer</code>,<code>Double</code>,
717 * <code>Long</code> ...), <code>String</code> objects, and byte
718 * arrays.
719 *
720 * @param value the object in the Java programming language ("Java object")
721 * to be written; it must not be null
722 * @throws JMSException if the JMS provider fails to write the message due
723 * to some internal error.
724 * @throws MessageFormatException if the object is of an invalid type.
725 * @throws MessageNotWriteableException if the message is in read-only mode.
726 * @throws java.lang.NullPointerException if the parameter
727 * <code>value</code> is null.
728 */
729 public void writeObject(Object value) throws JMSException {
730 if (value == null) {
731 throw new NullPointerException();
732 }
733 initializeWriting();
734 if (value instanceof Boolean) {
735 writeBoolean(((Boolean)value).booleanValue());
736 } else if (value instanceof Character) {
737 writeChar(((Character)value).charValue());
738 } else if (value instanceof Byte) {
739 writeByte(((Byte)value).byteValue());
740 } else if (value instanceof Short) {
741 writeShort(((Short)value).shortValue());
742 } else if (value instanceof Integer) {
743 writeInt(((Integer)value).intValue());
744 } else if (value instanceof Long) {
745 writeLong(((Long)value).longValue());
746 } else if (value instanceof Float) {
747 writeFloat(((Float)value).floatValue());
748 } else if (value instanceof Double) {
749 writeDouble(((Double)value).doubleValue());
750 } else if (value instanceof String) {
751 writeUTF(value.toString());
752 } else if (value instanceof byte[]) {
753 writeBytes((byte[])value);
754 } else {
755 throw new MessageFormatException("Cannot write non-primitive type:" + value.getClass());
756 }
757 }
758
759 /**
760 * Puts the message body in read-only mode and repositions the stream of
761 * bytes to the beginning.
762 *
763 * @throws JMSException if an internal error occurs
764 */
765 public void reset() throws JMSException {
766 storeContent();
767 this.bytesOut = null;
768 this.dataIn = null;
769 this.dataOut = null;
770 setReadOnlyBody(true);
771 }
772
773 private void initializeWriting() throws JMSException {
774 checkReadOnlyBody();
775 if (this.dataOut == null) {
776 this.bytesOut = new ByteArrayOutputStream();
777 OutputStream os = bytesOut;
778 ActiveMQConnection connection = getConnection();
779 if (connection != null && connection.isUseCompression()) {
780 // keep track of the real length of the content if
781 // we are compressed.
782 try {
783 os.write(new byte[4]);
784 } catch (IOException e) {
785 throw JMSExceptionSupport.create(e);
786 }
787 length = 0;
788 compressed = true;
789 final Deflater deflater = new Deflater(Deflater.BEST_SPEED);
790 os = new FilterOutputStream(new DeflaterOutputStream(os, deflater)) {
791 public void write(byte[] arg0) throws IOException {
792 length += arg0.length;
793 out.write(arg0);
794 }
795
796 public void write(byte[] arg0, int arg1, int arg2) throws IOException {
797 length += arg2;
798 out.write(arg0, arg1, arg2);
799 }
800
801 public void write(int arg0) throws IOException {
802 length++;
803 out.write(arg0);
804 }
805
806 @Override
807 public void close() throws IOException {
808 super.close();
809 deflater.end();
810 }
811 };
812 }
813 this.dataOut = new DataOutputStream(os);
814 }
815 }
816
817 protected void checkWriteOnlyBody() throws MessageNotReadableException {
818 if (!readOnlyBody) {
819 throw new MessageNotReadableException("Message body is write-only");
820 }
821 }
822
823 private void initializeReading() throws JMSException {
824 checkWriteOnlyBody();
825 if (dataIn == null) {
826 ByteSequence data = getContent();
827 if (data == null) {
828 data = new ByteSequence(new byte[] {}, 0, 0);
829 }
830 InputStream is = new ByteArrayInputStream(data);
831 if (isCompressed()) {
832 // keep track of the real length of the content if
833 // we are compressed.
834 try {
835 DataInputStream dis = new DataInputStream(is);
836 length = dis.readInt();
837 dis.close();
838 } catch (IOException e) {
839 throw JMSExceptionSupport.create(e);
840 }
841 is = new InflaterInputStream(is);
842 } else {
843 length = data.getLength();
844 }
845 dataIn = new DataInputStream(is);
846 }
847 }
848
849 public void setObjectProperty(String name, Object value) throws JMSException {
850 initializeWriting();
851 super.setObjectProperty(name, value);
852 }
853
854 public String toString() {
855 return super.toString() + " ActiveMQBytesMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
856 }
857
858 @Override
859 protected void doCompress() throws IOException {
860 compressed = true;
861 ByteSequence bytes = getContent();
862 int length = bytes.getLength();
863 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
864 bytesOut.write(new byte[4]);
865 DeflaterOutputStream os = new DeflaterOutputStream(bytesOut);
866 DataOutputStream dataOut = new DataOutputStream(os);
867 dataOut.write(bytes.data, bytes.offset, bytes.length);
868 dataOut.flush();
869 dataOut.close();
870 bytes = bytesOut.toByteSequence();
871 ByteSequenceData.writeIntBig(bytes, length);
872 bytes.offset = 0;
873 setContent(bytes);
874 }
875 }