001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.command;
019
020 import java.io.BufferedInputStream;
021 import java.io.DataInputStream;
022 import java.io.DataOutputStream;
023 import java.io.EOFException;
024 import java.io.IOException;
025 import java.io.InputStream;
026 import java.io.OutputStream;
027 import java.util.zip.DeflaterOutputStream;
028 import java.util.zip.InflaterInputStream;
029
030 import javax.jms.JMSException;
031 import javax.jms.MessageEOFException;
032 import javax.jms.MessageFormatException;
033 import javax.jms.MessageNotReadableException;
034 import javax.jms.MessageNotWriteableException;
035 import javax.jms.StreamMessage;
036
037 import org.apache.activemq.ActiveMQConnection;
038 import org.apache.activemq.util.ByteArrayInputStream;
039 import org.apache.activemq.util.ByteArrayOutputStream;
040 import org.apache.activemq.util.ByteSequence;
041 import org.apache.activemq.util.JMSExceptionSupport;
042 import org.apache.activemq.util.MarshallingSupport;
043
044 /**
045 * A <CODE>StreamMessage</CODE> object is used to send a stream of primitive
046 * types in the Java programming language. It is filled and read sequentially.
047 * It inherits from the <CODE>Message</CODE> interface and adds a stream
048 * message body. Its methods are based largely on those found in
049 * <CODE>java.io.DataInputStream</CODE> and
050 * <CODE>java.io.DataOutputStream</CODE>. <p/>
051 * <P>
052 * The primitive types can be read or written explicitly using methods for each
053 * type. They may also be read or written generically as objects. For instance,
054 * a call to <CODE>StreamMessage.writeInt(6)</CODE> is equivalent to
055 * <CODE>StreamMessage.writeObject(new
056 * Integer(6))</CODE>. Both forms are
057 * provided, because the explicit form is convenient for static programming, and
058 * the object form is needed when types are not known at compile time. <p/>
059 * <P>
060 * When the message is first created, and when <CODE>clearBody</CODE> is
061 * called, the body of the message is in write-only mode. After the first call
062 * to <CODE>reset</CODE> has been made, the message body is in read-only mode.
063 * After a message has been sent, the client that sent it can retain and modify
064 * it without affecting the message that has been sent. The same message object
065 * can be sent multiple times. When a message has been received, the provider
066 * has called <CODE>reset</CODE> so that the message body is in read-only mode
067 * for the client. <p/>
068 * <P>
069 * If <CODE>clearBody</CODE> is called on a message in read-only mode, the
070 * message body is cleared and the message body is in write-only mode. <p/>
071 * <P>
072 * If a client attempts to read a message in write-only mode, a
073 * <CODE>MessageNotReadableException</CODE> is thrown. <p/>
074 * <P>
075 * If a client attempts to write a message in read-only mode, a
076 * <CODE>MessageNotWriteableException</CODE> is thrown. <p/>
077 * <P>
078 * <CODE>StreamMessage</CODE> objects support the following conversion table.
079 * The marked cases must be supported. The unmarked cases must throw a
080 * <CODE>JMSException</CODE>. The <CODE>String</CODE>-to-primitive
081 * conversions may throw a runtime exception if the primitive's
082 * <CODE>valueOf()</CODE> method does not accept it as a valid
083 * <CODE>String</CODE> representation of the primitive. <p/>
084 * <P>
085 * A value written as the row type can be read as the column type. <p/>
086 *
087 * <PRE>
088 * | | boolean byte short char int long float double String byte[]
089 * |----------------------------------------------------------------------
090 * |boolean | X X |byte | X X X X X |short | X X X X |char | X X |int | X X X
091 * |long | X X |float | X X X |double | X X |String | X X X X X X X X |byte[] |
092 * X |----------------------------------------------------------------------
093 *
094 * </PRE>
095 *
096 * <p/>
097 * <P>
098 * Attempting to read a null value as a primitive type must be treated as
099 * calling the primitive's corresponding <code>valueOf(String)</code>
100 * conversion method with a null value. Since <code>char</code> does not
101 * support a <code>String</code> conversion, attempting to read a null value
102 * as a <code>char</code> must throw a <code>NullPointerException</code>.
103 *
104 * @openwire:marshaller code="27"
105 * @see javax.jms.Session#createStreamMessage()
106 * @see javax.jms.BytesMessage
107 * @see javax.jms.MapMessage
108 * @see javax.jms.Message
109 * @see javax.jms.ObjectMessage
110 * @see javax.jms.TextMessage
111 */
112 public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage {
113
114 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_STREAM_MESSAGE;
115
116 protected transient DataOutputStream dataOut;
117 protected transient ByteArrayOutputStream bytesOut;
118 protected transient DataInputStream dataIn;
119 protected transient int remainingBytes = -1;
120
121 public Message copy() {
122 ActiveMQStreamMessage copy = new ActiveMQStreamMessage();
123 copy(copy);
124 return copy;
125 }
126
127 private void copy(ActiveMQStreamMessage copy) {
128 storeContent();
129 super.copy(copy);
130 copy.dataOut = null;
131 copy.bytesOut = null;
132 copy.dataIn = null;
133 }
134
135 public void onSend() throws JMSException {
136 super.onSend();
137 storeContent();
138 }
139
140 @Override
141 public void storeContent() {
142 if (dataOut != null) {
143 try {
144 dataOut.close();
145 setContent(bytesOut.toByteSequence());
146 bytesOut = null;
147 dataOut = null;
148 } catch (IOException ioe) {
149 throw new RuntimeException(ioe);
150 }
151 }
152 }
153
154 public byte getDataStructureType() {
155 return DATA_STRUCTURE_TYPE;
156 }
157
158 public String getJMSXMimeType() {
159 return "jms/stream-message";
160 }
161
162 /**
163 * Clears out the message body. Clearing a message's body does not clear its
164 * header values or property entries. <p/>
165 * <P>
166 * If this message body was read-only, calling this method leaves the
167 * message body in the same state as an empty body in a newly created
168 * message.
169 *
170 * @throws JMSException if the JMS provider fails to clear the message body
171 * due to some internal error.
172 */
173
174 public void clearBody() throws JMSException {
175 super.clearBody();
176 this.dataOut = null;
177 this.dataIn = null;
178 this.bytesOut = null;
179 this.remainingBytes = -1;
180 }
181
182 /**
183 * Reads a <code>boolean</code> from the stream message.
184 *
185 * @return the <code>boolean</code> value read
186 * @throws JMSException if the JMS provider fails to read the message due to
187 * some internal error.
188 * @throws MessageEOFException if unexpected end of message stream has been
189 * reached.
190 * @throws MessageFormatException if this type conversion is invalid.
191 * @throws MessageNotReadableException if the message is in write-only mode.
192 */
193
194 public boolean readBoolean() throws JMSException {
195 initializeReading();
196 try {
197
198 this.dataIn.mark(10);
199 int type = this.dataIn.read();
200 if (type == -1) {
201 throw new MessageEOFException("reached end of data");
202 }
203 if (type == MarshallingSupport.BOOLEAN_TYPE) {
204 return this.dataIn.readBoolean();
205 }
206 if (type == MarshallingSupport.STRING_TYPE) {
207 return Boolean.valueOf(this.dataIn.readUTF()).booleanValue();
208 }
209 if (type == MarshallingSupport.NULL) {
210 this.dataIn.reset();
211 throw new NullPointerException("Cannot convert NULL value to boolean.");
212 } else {
213 this.dataIn.reset();
214 throw new MessageFormatException(" not a boolean type");
215 }
216 } catch (EOFException e) {
217 throw JMSExceptionSupport.createMessageEOFException(e);
218 } catch (IOException e) {
219 throw JMSExceptionSupport.createMessageFormatException(e);
220 }
221 }
222
223 /**
224 * Reads a <code>byte</code> value from the stream message.
225 *
226 * @return the next byte from the stream message as a 8-bit
227 * <code>byte</code>
228 * @throws JMSException if the JMS provider fails to read the message due to
229 * some internal error.
230 * @throws MessageEOFException if unexpected end of message stream has been
231 * reached.
232 * @throws MessageFormatException if this type conversion is invalid.
233 * @throws MessageNotReadableException if the message is in write-only mode.
234 */
235
236 public byte readByte() throws JMSException {
237 initializeReading();
238 try {
239
240 this.dataIn.mark(10);
241 int type = this.dataIn.read();
242 if (type == -1) {
243 throw new MessageEOFException("reached end of data");
244 }
245 if (type == MarshallingSupport.BYTE_TYPE) {
246 return this.dataIn.readByte();
247 }
248 if (type == MarshallingSupport.STRING_TYPE) {
249 return Byte.valueOf(this.dataIn.readUTF()).byteValue();
250 }
251 if (type == MarshallingSupport.NULL) {
252 this.dataIn.reset();
253 throw new NullPointerException("Cannot convert NULL value to byte.");
254 } else {
255 this.dataIn.reset();
256 throw new MessageFormatException(" not a byte type");
257 }
258 } catch (NumberFormatException mfe) {
259 try {
260 this.dataIn.reset();
261 } catch (IOException ioe) {
262 throw JMSExceptionSupport.create(ioe);
263 }
264 throw mfe;
265
266 } catch (EOFException e) {
267 throw JMSExceptionSupport.createMessageEOFException(e);
268 } catch (IOException e) {
269 throw JMSExceptionSupport.createMessageFormatException(e);
270 }
271 }
272
273 /**
274 * Reads a 16-bit integer from the stream message.
275 *
276 * @return a 16-bit integer from the stream message
277 * @throws JMSException if the JMS provider fails to read the message due to
278 * some internal error.
279 * @throws MessageEOFException if unexpected end of message stream has been
280 * reached.
281 * @throws MessageFormatException if this type conversion is invalid.
282 * @throws MessageNotReadableException if the message is in write-only mode.
283 */
284
285 public short readShort() throws JMSException {
286 initializeReading();
287 try {
288
289 this.dataIn.mark(17);
290 int type = this.dataIn.read();
291 if (type == -1) {
292 throw new MessageEOFException("reached end of data");
293 }
294 if (type == MarshallingSupport.SHORT_TYPE) {
295 return this.dataIn.readShort();
296 }
297 if (type == MarshallingSupport.BYTE_TYPE) {
298 return this.dataIn.readByte();
299 }
300 if (type == MarshallingSupport.STRING_TYPE) {
301 return Short.valueOf(this.dataIn.readUTF()).shortValue();
302 }
303 if (type == MarshallingSupport.NULL) {
304 this.dataIn.reset();
305 throw new NullPointerException("Cannot convert NULL value to short.");
306 } else {
307 this.dataIn.reset();
308 throw new MessageFormatException(" not a short type");
309 }
310 } catch (NumberFormatException mfe) {
311 try {
312 this.dataIn.reset();
313 } catch (IOException ioe) {
314 throw JMSExceptionSupport.create(ioe);
315 }
316 throw mfe;
317
318 } catch (EOFException e) {
319 throw JMSExceptionSupport.createMessageEOFException(e);
320 } catch (IOException e) {
321 throw JMSExceptionSupport.createMessageFormatException(e);
322 }
323
324 }
325
326 /**
327 * Reads a Unicode character value from the stream message.
328 *
329 * @return a Unicode character from the stream message
330 * @throws JMSException if the JMS provider fails to read the message due to
331 * some internal error.
332 * @throws MessageEOFException if unexpected end of message stream has been
333 * reached.
334 * @throws MessageFormatException if this type conversion is invalid
335 * @throws MessageNotReadableException if the message is in write-only mode.
336 */
337
338 public char readChar() throws JMSException {
339 initializeReading();
340 try {
341
342 this.dataIn.mark(17);
343 int type = this.dataIn.read();
344 if (type == -1) {
345 throw new MessageEOFException("reached end of data");
346 }
347 if (type == MarshallingSupport.CHAR_TYPE) {
348 return this.dataIn.readChar();
349 }
350 if (type == MarshallingSupport.NULL) {
351 this.dataIn.reset();
352 throw new NullPointerException("Cannot convert NULL value to char.");
353 } else {
354 this.dataIn.reset();
355 throw new MessageFormatException(" not a char type");
356 }
357 } catch (NumberFormatException mfe) {
358 try {
359 this.dataIn.reset();
360 } catch (IOException ioe) {
361 throw JMSExceptionSupport.create(ioe);
362 }
363 throw mfe;
364
365 } catch (EOFException e) {
366 throw JMSExceptionSupport.createMessageEOFException(e);
367 } catch (IOException e) {
368 throw JMSExceptionSupport.createMessageFormatException(e);
369 }
370 }
371
372 /**
373 * Reads a 32-bit integer from the stream message.
374 *
375 * @return a 32-bit integer value from the stream message, interpreted as an
376 * <code>int</code>
377 * @throws JMSException if the JMS provider fails to read the message due to
378 * some internal error.
379 * @throws MessageEOFException if unexpected end of message stream has been
380 * reached.
381 * @throws MessageFormatException if this type conversion is invalid.
382 * @throws MessageNotReadableException if the message is in write-only mode.
383 */
384
385 public int readInt() throws JMSException {
386 initializeReading();
387 try {
388
389 this.dataIn.mark(33);
390 int type = this.dataIn.read();
391 if (type == -1) {
392 throw new MessageEOFException("reached end of data");
393 }
394 if (type == MarshallingSupport.INTEGER_TYPE) {
395 return this.dataIn.readInt();
396 }
397 if (type == MarshallingSupport.SHORT_TYPE) {
398 return this.dataIn.readShort();
399 }
400 if (type == MarshallingSupport.BYTE_TYPE) {
401 return this.dataIn.readByte();
402 }
403 if (type == MarshallingSupport.STRING_TYPE) {
404 return Integer.valueOf(this.dataIn.readUTF()).intValue();
405 }
406 if (type == MarshallingSupport.NULL) {
407 this.dataIn.reset();
408 throw new NullPointerException("Cannot convert NULL value to int.");
409 } else {
410 this.dataIn.reset();
411 throw new MessageFormatException(" not an int type");
412 }
413 } catch (NumberFormatException mfe) {
414 try {
415 this.dataIn.reset();
416 } catch (IOException ioe) {
417 throw JMSExceptionSupport.create(ioe);
418 }
419 throw mfe;
420
421 } catch (EOFException e) {
422 throw JMSExceptionSupport.createMessageEOFException(e);
423 } catch (IOException e) {
424 throw JMSExceptionSupport.createMessageFormatException(e);
425 }
426 }
427
428 /**
429 * Reads a 64-bit integer from the stream message.
430 *
431 * @return a 64-bit integer value from the stream message, interpreted as a
432 * <code>long</code>
433 * @throws JMSException if the JMS provider fails to read the message due to
434 * some internal error.
435 * @throws MessageEOFException if unexpected end of message stream has been
436 * reached.
437 * @throws MessageFormatException if this type conversion is invalid.
438 * @throws MessageNotReadableException if the message is in write-only mode.
439 */
440
441 public long readLong() throws JMSException {
442 initializeReading();
443 try {
444
445 this.dataIn.mark(65);
446 int type = this.dataIn.read();
447 if (type == -1) {
448 throw new MessageEOFException("reached end of data");
449 }
450 if (type == MarshallingSupport.LONG_TYPE) {
451 return this.dataIn.readLong();
452 }
453 if (type == MarshallingSupport.INTEGER_TYPE) {
454 return this.dataIn.readInt();
455 }
456 if (type == MarshallingSupport.SHORT_TYPE) {
457 return this.dataIn.readShort();
458 }
459 if (type == MarshallingSupport.BYTE_TYPE) {
460 return this.dataIn.readByte();
461 }
462 if (type == MarshallingSupport.STRING_TYPE) {
463 return Long.valueOf(this.dataIn.readUTF()).longValue();
464 }
465 if (type == MarshallingSupport.NULL) {
466 this.dataIn.reset();
467 throw new NullPointerException("Cannot convert NULL value to long.");
468 } else {
469 this.dataIn.reset();
470 throw new MessageFormatException(" not a long type");
471 }
472 } catch (NumberFormatException mfe) {
473 try {
474 this.dataIn.reset();
475 } catch (IOException ioe) {
476 throw JMSExceptionSupport.create(ioe);
477 }
478 throw mfe;
479
480 } catch (EOFException e) {
481 throw JMSExceptionSupport.createMessageEOFException(e);
482 } catch (IOException e) {
483 throw JMSExceptionSupport.createMessageFormatException(e);
484 }
485 }
486
487 /**
488 * Reads a <code>float</code> from the stream message.
489 *
490 * @return a <code>float</code> value from the stream message
491 * @throws JMSException if the JMS provider fails to read the message due to
492 * some internal error.
493 * @throws MessageEOFException if unexpected end of message stream has been
494 * reached.
495 * @throws MessageFormatException if this type conversion is invalid.
496 * @throws MessageNotReadableException if the message is in write-only mode.
497 */
498
499 public float readFloat() throws JMSException {
500 initializeReading();
501 try {
502 this.dataIn.mark(33);
503 int type = this.dataIn.read();
504 if (type == -1) {
505 throw new MessageEOFException("reached end of data");
506 }
507 if (type == MarshallingSupport.FLOAT_TYPE) {
508 return this.dataIn.readFloat();
509 }
510 if (type == MarshallingSupport.STRING_TYPE) {
511 return Float.valueOf(this.dataIn.readUTF()).floatValue();
512 }
513 if (type == MarshallingSupport.NULL) {
514 this.dataIn.reset();
515 throw new NullPointerException("Cannot convert NULL value to float.");
516 } else {
517 this.dataIn.reset();
518 throw new MessageFormatException(" not a float type");
519 }
520 } catch (NumberFormatException mfe) {
521 try {
522 this.dataIn.reset();
523 } catch (IOException ioe) {
524 throw JMSExceptionSupport.create(ioe);
525 }
526 throw mfe;
527
528 } catch (EOFException e) {
529 throw JMSExceptionSupport.createMessageEOFException(e);
530 } catch (IOException e) {
531 throw JMSExceptionSupport.createMessageFormatException(e);
532 }
533 }
534
535 /**
536 * Reads a <code>double</code> from the stream message.
537 *
538 * @return a <code>double</code> value from the stream message
539 * @throws JMSException if the JMS provider fails to read the message due to
540 * some internal error.
541 * @throws MessageEOFException if unexpected end of message stream has been
542 * reached.
543 * @throws MessageFormatException if this type conversion is invalid.
544 * @throws MessageNotReadableException if the message is in write-only mode.
545 */
546
547 public double readDouble() throws JMSException {
548 initializeReading();
549 try {
550
551 this.dataIn.mark(65);
552 int type = this.dataIn.read();
553 if (type == -1) {
554 throw new MessageEOFException("reached end of data");
555 }
556 if (type == MarshallingSupport.DOUBLE_TYPE) {
557 return this.dataIn.readDouble();
558 }
559 if (type == MarshallingSupport.FLOAT_TYPE) {
560 return this.dataIn.readFloat();
561 }
562 if (type == MarshallingSupport.STRING_TYPE) {
563 return Double.valueOf(this.dataIn.readUTF()).doubleValue();
564 }
565 if (type == MarshallingSupport.NULL) {
566 this.dataIn.reset();
567 throw new NullPointerException("Cannot convert NULL value to double.");
568 } else {
569 this.dataIn.reset();
570 throw new MessageFormatException(" not a double type");
571 }
572 } catch (NumberFormatException mfe) {
573 try {
574 this.dataIn.reset();
575 } catch (IOException ioe) {
576 throw JMSExceptionSupport.create(ioe);
577 }
578 throw mfe;
579
580 } catch (EOFException e) {
581 throw JMSExceptionSupport.createMessageEOFException(e);
582 } catch (IOException e) {
583 throw JMSExceptionSupport.createMessageFormatException(e);
584 }
585 }
586
587 /**
588 * Reads a <CODE>String</CODE> from the stream message.
589 *
590 * @return a Unicode string from the stream message
591 * @throws JMSException if the JMS provider fails to read the message due to
592 * some internal error.
593 * @throws MessageEOFException if unexpected end of message stream has been
594 * reached.
595 * @throws MessageFormatException if this type conversion is invalid.
596 * @throws MessageNotReadableException if the message is in write-only mode.
597 */
598
599 public String readString() throws JMSException {
600 initializeReading();
601 try {
602
603 this.dataIn.mark(65);
604 int type = this.dataIn.read();
605 if (type == -1) {
606 throw new MessageEOFException("reached end of data");
607 }
608 if (type == MarshallingSupport.NULL) {
609 return null;
610 }
611 if (type == MarshallingSupport.BIG_STRING_TYPE) {
612 return MarshallingSupport.readUTF8(dataIn);
613 }
614 if (type == MarshallingSupport.STRING_TYPE) {
615 return this.dataIn.readUTF();
616 }
617 if (type == MarshallingSupport.LONG_TYPE) {
618 return new Long(this.dataIn.readLong()).toString();
619 }
620 if (type == MarshallingSupport.INTEGER_TYPE) {
621 return new Integer(this.dataIn.readInt()).toString();
622 }
623 if (type == MarshallingSupport.SHORT_TYPE) {
624 return new Short(this.dataIn.readShort()).toString();
625 }
626 if (type == MarshallingSupport.BYTE_TYPE) {
627 return new Byte(this.dataIn.readByte()).toString();
628 }
629 if (type == MarshallingSupport.FLOAT_TYPE) {
630 return new Float(this.dataIn.readFloat()).toString();
631 }
632 if (type == MarshallingSupport.DOUBLE_TYPE) {
633 return new Double(this.dataIn.readDouble()).toString();
634 }
635 if (type == MarshallingSupport.BOOLEAN_TYPE) {
636 return (this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE).toString();
637 }
638 if (type == MarshallingSupport.CHAR_TYPE) {
639 return new Character(this.dataIn.readChar()).toString();
640 } else {
641 this.dataIn.reset();
642 throw new MessageFormatException(" not a String type");
643 }
644 } catch (NumberFormatException mfe) {
645 try {
646 this.dataIn.reset();
647 } catch (IOException ioe) {
648 throw JMSExceptionSupport.create(ioe);
649 }
650 throw mfe;
651
652 } catch (EOFException e) {
653 throw JMSExceptionSupport.createMessageEOFException(e);
654 } catch (IOException e) {
655 throw JMSExceptionSupport.createMessageFormatException(e);
656 }
657 }
658
659 /**
660 * Reads a byte array field from the stream message into the specified
661 * <CODE>byte[]</CODE> object (the read buffer). <p/>
662 * <P>
663 * To read the field value, <CODE>readBytes</CODE> should be successively
664 * called until it returns a value less than the length of the read buffer.
665 * The value of the bytes in the buffer following the last byte read is
666 * undefined. <p/>
667 * <P>
668 * If <CODE>readBytes</CODE> returns a value equal to the length of the
669 * buffer, a subsequent <CODE>readBytes</CODE> call must be made. If there
670 * are no more bytes to be read, this call returns -1. <p/>
671 * <P>
672 * If the byte array field value is null, <CODE>readBytes</CODE> returns
673 * -1. <p/>
674 * <P>
675 * If the byte array field value is empty, <CODE>readBytes</CODE> returns
676 * 0. <p/>
677 * <P>
678 * Once the first <CODE>readBytes</CODE> call on a <CODE>byte[]</CODE>
679 * field value has been made, the full value of the field must be read
680 * before it is valid to read the next field. An attempt to read the next
681 * field before that has been done will throw a
682 * <CODE>MessageFormatException</CODE>. <p/>
683 * <P>
684 * To read the byte field value into a new <CODE>byte[]</CODE> object, use
685 * the <CODE>readObject</CODE> method.
686 *
687 * @param value the buffer into which the data is read
688 * @return the total number of bytes read into the buffer, or -1 if there is
689 * no more data because the end of the byte field has been reached
690 * @throws JMSException if the JMS provider fails to read the message due to
691 * some internal error.
692 * @throws MessageEOFException if unexpected end of message stream has been
693 * reached.
694 * @throws MessageFormatException if this type conversion is invalid.
695 * @throws MessageNotReadableException if the message is in write-only mode.
696 * @see #readObject()
697 */
698
699 public int readBytes(byte[] value) throws JMSException {
700
701 initializeReading();
702 try {
703 if (value == null) {
704 throw new NullPointerException();
705 }
706
707 if (remainingBytes == -1) {
708 this.dataIn.mark(value.length + 1);
709 int type = this.dataIn.read();
710 if (type == -1) {
711 throw new MessageEOFException("reached end of data");
712 }
713 if (type != MarshallingSupport.BYTE_ARRAY_TYPE) {
714 throw new MessageFormatException("Not a byte array");
715 }
716 remainingBytes = this.dataIn.readInt();
717 } else if (remainingBytes == 0) {
718 remainingBytes = -1;
719 return -1;
720 }
721
722 if (value.length <= remainingBytes) {
723 // small buffer
724 remainingBytes -= value.length;
725 this.dataIn.readFully(value);
726 return value.length;
727 } else {
728 // big buffer
729 int rc = this.dataIn.read(value, 0, remainingBytes);
730 remainingBytes = 0;
731 return rc;
732 }
733
734 } catch (EOFException e) {
735 JMSException jmsEx = new MessageEOFException(e.getMessage());
736 jmsEx.setLinkedException(e);
737 throw jmsEx;
738 } catch (IOException e) {
739 JMSException jmsEx = new MessageFormatException(e.getMessage());
740 jmsEx.setLinkedException(e);
741 throw jmsEx;
742 }
743 }
744
745 /**
746 * Reads an object from the stream message. <p/>
747 * <P>
748 * This method can be used to return, in objectified format, an object in
749 * the Java programming language ("Java object") that has been written to
750 * the stream with the equivalent <CODE>writeObject</CODE> method call, or
751 * its equivalent primitive <CODE>write<I>type</I></CODE> method. <p/>
752 * <P>
753 * Note that byte values are returned as <CODE>byte[]</CODE>, not
754 * <CODE>Byte[]</CODE>. <p/>
755 * <P>
756 * An attempt to call <CODE>readObject</CODE> to read a byte field value
757 * into a new <CODE>byte[]</CODE> object before the full value of the byte
758 * field has been read will throw a <CODE>MessageFormatException</CODE>.
759 *
760 * @return a Java object from the stream message, in objectified format (for
761 * example, if the object was written as an <CODE>int</CODE>, an
762 * <CODE>Integer</CODE> is returned)
763 * @throws JMSException if the JMS provider fails to read the message due to
764 * some internal error.
765 * @throws MessageEOFException if unexpected end of message stream has been
766 * reached.
767 * @throws MessageFormatException if this type conversion is invalid.
768 * @throws MessageNotReadableException if the message is in write-only mode.
769 * @see #readBytes(byte[] value)
770 */
771
772 public Object readObject() throws JMSException {
773 initializeReading();
774 try {
775 this.dataIn.mark(65);
776 int type = this.dataIn.read();
777 if (type == -1) {
778 throw new MessageEOFException("reached end of data");
779 }
780 if (type == MarshallingSupport.NULL) {
781 return null;
782 }
783 if (type == MarshallingSupport.BIG_STRING_TYPE) {
784 return MarshallingSupport.readUTF8(dataIn);
785 }
786 if (type == MarshallingSupport.STRING_TYPE) {
787 return this.dataIn.readUTF();
788 }
789 if (type == MarshallingSupport.LONG_TYPE) {
790 return Long.valueOf(this.dataIn.readLong());
791 }
792 if (type == MarshallingSupport.INTEGER_TYPE) {
793 return Integer.valueOf(this.dataIn.readInt());
794 }
795 if (type == MarshallingSupport.SHORT_TYPE) {
796 return Short.valueOf(this.dataIn.readShort());
797 }
798 if (type == MarshallingSupport.BYTE_TYPE) {
799 return Byte.valueOf(this.dataIn.readByte());
800 }
801 if (type == MarshallingSupport.FLOAT_TYPE) {
802 return new Float(this.dataIn.readFloat());
803 }
804 if (type == MarshallingSupport.DOUBLE_TYPE) {
805 return new Double(this.dataIn.readDouble());
806 }
807 if (type == MarshallingSupport.BOOLEAN_TYPE) {
808 return this.dataIn.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
809 }
810 if (type == MarshallingSupport.CHAR_TYPE) {
811 return Character.valueOf(this.dataIn.readChar());
812 }
813 if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
814 int len = this.dataIn.readInt();
815 byte[] value = new byte[len];
816 this.dataIn.readFully(value);
817 return value;
818 } else {
819 this.dataIn.reset();
820 throw new MessageFormatException("unknown type");
821 }
822 } catch (NumberFormatException mfe) {
823 try {
824 this.dataIn.reset();
825 } catch (IOException ioe) {
826 throw JMSExceptionSupport.create(ioe);
827 }
828 throw mfe;
829
830 } catch (EOFException e) {
831 JMSException jmsEx = new MessageEOFException(e.getMessage());
832 jmsEx.setLinkedException(e);
833 throw jmsEx;
834 } catch (IOException e) {
835 JMSException jmsEx = new MessageFormatException(e.getMessage());
836 jmsEx.setLinkedException(e);
837 throw jmsEx;
838 }
839 }
840
841 /**
842 * Writes a <code>boolean</code> to the stream message. The value
843 * <code>true</code> is written as the value <code>(byte)1</code>; the
844 * value <code>false</code> is written as the value <code>(byte)0</code>.
845 *
846 * @param value the <code>boolean</code> value to be written
847 * @throws JMSException if the JMS provider fails to write the message due
848 * to some internal error.
849 * @throws MessageNotWriteableException if the message is in read-only mode.
850 */
851
852 public void writeBoolean(boolean value) throws JMSException {
853 initializeWriting();
854 try {
855 MarshallingSupport.marshalBoolean(dataOut, value);
856 } catch (IOException ioe) {
857 throw JMSExceptionSupport.create(ioe);
858 }
859 }
860
861 /**
862 * Writes a <code>byte</code> to the stream message.
863 *
864 * @param value the <code>byte</code> value to be written
865 * @throws JMSException if the JMS provider fails to write the message due
866 * to some internal error.
867 * @throws MessageNotWriteableException if the message is in read-only mode.
868 */
869
870 public void writeByte(byte value) throws JMSException {
871 initializeWriting();
872 try {
873 MarshallingSupport.marshalByte(dataOut, value);
874 } catch (IOException ioe) {
875 throw JMSExceptionSupport.create(ioe);
876 }
877 }
878
879 /**
880 * Writes a <code>short</code> to the stream message.
881 *
882 * @param value the <code>short</code> value to be written
883 * @throws JMSException if the JMS provider fails to write the message due
884 * to some internal error.
885 * @throws MessageNotWriteableException if the message is in read-only mode.
886 */
887
888 public void writeShort(short value) throws JMSException {
889 initializeWriting();
890 try {
891 MarshallingSupport.marshalShort(dataOut, value);
892 } catch (IOException ioe) {
893 throw JMSExceptionSupport.create(ioe);
894 }
895 }
896
897 /**
898 * Writes a <code>char</code> to the stream message.
899 *
900 * @param value the <code>char</code> value to be written
901 * @throws JMSException if the JMS provider fails to write the message due
902 * to some internal error.
903 * @throws MessageNotWriteableException if the message is in read-only mode.
904 */
905
906 public void writeChar(char value) throws JMSException {
907 initializeWriting();
908 try {
909 MarshallingSupport.marshalChar(dataOut, value);
910 } catch (IOException ioe) {
911 throw JMSExceptionSupport.create(ioe);
912 }
913 }
914
915 /**
916 * Writes an <code>int</code> to the stream message.
917 *
918 * @param value the <code>int</code> value to be written
919 * @throws JMSException if the JMS provider fails to write the message due
920 * to some internal error.
921 * @throws MessageNotWriteableException if the message is in read-only mode.
922 */
923
924 public void writeInt(int value) throws JMSException {
925 initializeWriting();
926 try {
927 MarshallingSupport.marshalInt(dataOut, value);
928 } catch (IOException ioe) {
929 throw JMSExceptionSupport.create(ioe);
930 }
931 }
932
933 /**
934 * Writes a <code>long</code> to the stream message.
935 *
936 * @param value the <code>long</code> value to be written
937 * @throws JMSException if the JMS provider fails to write the message due
938 * to some internal error.
939 * @throws MessageNotWriteableException if the message is in read-only mode.
940 */
941
942 public void writeLong(long value) throws JMSException {
943 initializeWriting();
944 try {
945 MarshallingSupport.marshalLong(dataOut, value);
946 } catch (IOException ioe) {
947 throw JMSExceptionSupport.create(ioe);
948 }
949 }
950
951 /**
952 * Writes a <code>float</code> to the stream message.
953 *
954 * @param value the <code>float</code> value to be written
955 * @throws JMSException if the JMS provider fails to write the message due
956 * to some internal error.
957 * @throws MessageNotWriteableException if the message is in read-only mode.
958 */
959
960 public void writeFloat(float value) throws JMSException {
961 initializeWriting();
962 try {
963 MarshallingSupport.marshalFloat(dataOut, value);
964 } catch (IOException ioe) {
965 throw JMSExceptionSupport.create(ioe);
966 }
967 }
968
969 /**
970 * Writes a <code>double</code> to the stream message.
971 *
972 * @param value the <code>double</code> value to be written
973 * @throws JMSException if the JMS provider fails to write the message due
974 * to some internal error.
975 * @throws MessageNotWriteableException if the message is in read-only mode.
976 */
977
978 public void writeDouble(double value) throws JMSException {
979 initializeWriting();
980 try {
981 MarshallingSupport.marshalDouble(dataOut, value);
982 } catch (IOException ioe) {
983 throw JMSExceptionSupport.create(ioe);
984 }
985 }
986
987 /**
988 * Writes a <code>String</code> to the stream message.
989 *
990 * @param value the <code>String</code> value to be written
991 * @throws JMSException if the JMS provider fails to write the message due
992 * to some internal error.
993 * @throws MessageNotWriteableException if the message is in read-only mode.
994 */
995
996 public void writeString(String value) throws JMSException {
997 initializeWriting();
998 try {
999 if (value == null) {
1000 MarshallingSupport.marshalNull(dataOut);
1001 } else {
1002 MarshallingSupport.marshalString(dataOut, value);
1003 }
1004 } catch (IOException ioe) {
1005 throw JMSExceptionSupport.create(ioe);
1006 }
1007 }
1008
1009 /**
1010 * Writes a byte array field to the stream message. <p/>
1011 * <P>
1012 * The byte array <code>value</code> is written to the message as a byte
1013 * array field. Consecutively written byte array fields are treated as two
1014 * distinct fields when the fields are read.
1015 *
1016 * @param value the byte array value to be written
1017 * @throws JMSException if the JMS provider fails to write the message due
1018 * to some internal error.
1019 * @throws MessageNotWriteableException if the message is in read-only mode.
1020 */
1021
1022 public void writeBytes(byte[] value) throws JMSException {
1023 writeBytes(value, 0, value.length);
1024 }
1025
1026 /**
1027 * Writes a portion of a byte array as a byte array field to the stream
1028 * message. <p/>
1029 * <P>
1030 * The a portion of the byte array <code>value</code> is written to the
1031 * message as a byte array field. Consecutively written byte array fields
1032 * are treated as two distinct fields when the fields are read.
1033 *
1034 * @param value the byte array value to be written
1035 * @param offset the initial offset within the byte array
1036 * @param length the number of bytes to use
1037 * @throws JMSException if the JMS provider fails to write the message due
1038 * to some internal error.
1039 * @throws MessageNotWriteableException if the message is in read-only mode.
1040 */
1041
1042 public void writeBytes(byte[] value, int offset, int length) throws JMSException {
1043 initializeWriting();
1044 try {
1045 MarshallingSupport.marshalByteArray(dataOut, value, offset, length);
1046 } catch (IOException ioe) {
1047 throw JMSExceptionSupport.create(ioe);
1048 }
1049 }
1050
1051 /**
1052 * Writes an object to the stream message. <p/>
1053 * <P>
1054 * This method works only for the objectified primitive object types (<code>Integer</code>,
1055 * <code>Double</code>, <code>Long</code> ...),
1056 * <code>String</code> objects, and byte arrays.
1057 *
1058 * @param value the Java object to be written
1059 * @throws JMSException if the JMS provider fails to write the message due
1060 * to some internal error.
1061 * @throws MessageFormatException if the object is invalid.
1062 * @throws MessageNotWriteableException if the message is in read-only mode.
1063 */
1064
1065 public void writeObject(Object value) throws JMSException {
1066 initializeWriting();
1067 if (value == null) {
1068 try {
1069 MarshallingSupport.marshalNull(dataOut);
1070 } catch (IOException ioe) {
1071 throw JMSExceptionSupport.create(ioe);
1072 }
1073 } else if (value instanceof String) {
1074 writeString(value.toString());
1075 } else if (value instanceof Character) {
1076 writeChar(((Character)value).charValue());
1077 } else if (value instanceof Boolean) {
1078 writeBoolean(((Boolean)value).booleanValue());
1079 } else if (value instanceof Byte) {
1080 writeByte(((Byte)value).byteValue());
1081 } else if (value instanceof Short) {
1082 writeShort(((Short)value).shortValue());
1083 } else if (value instanceof Integer) {
1084 writeInt(((Integer)value).intValue());
1085 } else if (value instanceof Float) {
1086 writeFloat(((Float)value).floatValue());
1087 } else if (value instanceof Double) {
1088 writeDouble(((Double)value).doubleValue());
1089 } else if (value instanceof byte[]) {
1090 writeBytes((byte[])value);
1091 }else if (value instanceof Long) {
1092 writeLong(((Long)value).longValue());
1093 }else {
1094 throw new MessageFormatException("Unsupported Object type: " + value.getClass());
1095 }
1096 }
1097
1098 /**
1099 * Puts the message body in read-only mode and repositions the stream of
1100 * bytes to the beginning.
1101 *
1102 * @throws JMSException if an internal error occurs
1103 */
1104
1105 public void reset() throws JMSException {
1106 storeContent();
1107 this.bytesOut = null;
1108 this.dataIn = null;
1109 this.dataOut = null;
1110 this.remainingBytes = -1;
1111 setReadOnlyBody(true);
1112 }
1113
1114 private void initializeWriting() throws MessageNotWriteableException {
1115 checkReadOnlyBody();
1116 if (this.dataOut == null) {
1117 this.bytesOut = new ByteArrayOutputStream();
1118 OutputStream os = bytesOut;
1119 ActiveMQConnection connection = getConnection();
1120 if (connection != null && connection.isUseCompression()) {
1121 compressed = true;
1122 os = new DeflaterOutputStream(os);
1123 }
1124 this.dataOut = new DataOutputStream(os);
1125 }
1126 }
1127
1128 protected void checkWriteOnlyBody() throws MessageNotReadableException {
1129 if (!readOnlyBody) {
1130 throw new MessageNotReadableException("Message body is write-only");
1131 }
1132 }
1133
1134 private void initializeReading() throws MessageNotReadableException {
1135 checkWriteOnlyBody();
1136 if (this.dataIn == null) {
1137 ByteSequence data = getContent();
1138 if (data == null) {
1139 data = new ByteSequence(new byte[] {}, 0, 0);
1140 }
1141 InputStream is = new ByteArrayInputStream(data);
1142 if (isCompressed()) {
1143 is = new InflaterInputStream(is);
1144 is = new BufferedInputStream(is);
1145 }
1146 this.dataIn = new DataInputStream(is);
1147 }
1148 }
1149
1150 @Override
1151 public void compress() throws IOException {
1152 storeContent();
1153 super.compress();
1154 }
1155
1156 public String toString() {
1157 return super.toString() + " ActiveMQStreamMessage{ " + "bytesOut = " + bytesOut + ", dataOut = " + dataOut + ", dataIn = " + dataIn + " }";
1158 }
1159 }