001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.command;
018
019 import java.io.DataInputStream;
020 import java.io.DataOutputStream;
021 import java.io.IOException;
022 import java.io.OutputStream;
023 import java.util.Collections;
024 import java.util.HashMap;
025 import java.util.Map;
026 import java.util.zip.DeflaterOutputStream;
027
028 import javax.jms.JMSException;
029
030 import org.apache.activemq.ActiveMQConnection;
031 import org.apache.activemq.advisory.AdvisorySupport;
032 import org.apache.activemq.broker.region.Destination;
033 import org.apache.activemq.broker.region.MessageReference;
034 import org.apache.activemq.broker.region.RegionBroker;
035 import org.apache.activemq.usage.MemoryUsage;
036 import org.apache.activemq.util.ByteArrayInputStream;
037 import org.apache.activemq.util.ByteArrayOutputStream;
038 import org.apache.activemq.util.ByteSequence;
039 import org.apache.activemq.util.MarshallingSupport;
040 import org.apache.activemq.wireformat.WireFormat;
041
042 /**
043 * Represents an ActiveMQ message
044 *
045 * @openwire:marshaller
046 *
047 */
048 public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
049
050 /**
051 * The default minimum amount of memory a message is assumed to use
052 */
053 public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
054
055 protected MessageId messageId;
056 protected ActiveMQDestination originalDestination;
057 protected TransactionId originalTransactionId;
058
059 protected ProducerId producerId;
060 protected ActiveMQDestination destination;
061 protected TransactionId transactionId;
062
063 protected long expiration;
064 protected long timestamp;
065 protected long arrival;
066 protected long brokerInTime;
067 protected long brokerOutTime;
068 protected String correlationId;
069 protected ActiveMQDestination replyTo;
070 protected boolean persistent;
071 protected String type;
072 protected byte priority;
073 protected String groupID;
074 protected int groupSequence;
075 protected ConsumerId targetConsumerId;
076 protected boolean compressed;
077 protected String userID;
078
079 protected ByteSequence content;
080 protected ByteSequence marshalledProperties;
081 protected DataStructure dataStructure;
082 protected int redeliveryCounter;
083
084 protected int size;
085 protected Map<String, Object> properties;
086 protected boolean readOnlyProperties;
087 protected boolean readOnlyBody;
088 protected transient boolean recievedByDFBridge;
089 protected boolean droppable;
090
091 private transient short referenceCount;
092 private transient ActiveMQConnection connection;
093 private transient org.apache.activemq.broker.region.Destination regionDestination;
094 private transient MemoryUsage memoryUsage;
095
096 private BrokerId[] brokerPath;
097 private BrokerId[] cluster;
098
099 public abstract Message copy();
100 public abstract void clearBody() throws JMSException;
101 public abstract void storeContent();
102
103 // useful to reduce the memory footprint of a persisted message
104 public void clearMarshalledState() throws JMSException {
105 properties = null;
106 }
107
108 protected void copy(Message copy) {
109 super.copy(copy);
110 copy.producerId = producerId;
111 copy.transactionId = transactionId;
112 copy.destination = destination;
113 copy.messageId = messageId != null ? messageId.copy() : null;
114 copy.originalDestination = originalDestination;
115 copy.originalTransactionId = originalTransactionId;
116 copy.expiration = expiration;
117 copy.timestamp = timestamp;
118 copy.correlationId = correlationId;
119 copy.replyTo = replyTo;
120 copy.persistent = persistent;
121 copy.redeliveryCounter = redeliveryCounter;
122 copy.type = type;
123 copy.priority = priority;
124 copy.size = size;
125 copy.groupID = groupID;
126 copy.userID = userID;
127 copy.groupSequence = groupSequence;
128
129 if (properties != null) {
130 copy.properties = new HashMap<String, Object>(properties);
131
132 // The new message hasn't expired, so remove this feild.
133 copy.properties.remove(RegionBroker.ORIGINAL_EXPIRATION);
134 } else {
135 copy.properties = properties;
136 }
137
138 copy.content = content;
139 copy.marshalledProperties = marshalledProperties;
140 copy.dataStructure = dataStructure;
141 copy.readOnlyProperties = readOnlyProperties;
142 copy.readOnlyBody = readOnlyBody;
143 copy.compressed = compressed;
144 copy.recievedByDFBridge = recievedByDFBridge;
145
146 copy.arrival = arrival;
147 copy.connection = connection;
148 copy.regionDestination = regionDestination;
149 copy.brokerInTime = brokerInTime;
150 copy.brokerOutTime = brokerOutTime;
151 copy.memoryUsage=this.memoryUsage;
152 copy.brokerPath = brokerPath;
153
154 // lets not copy the following fields
155 // copy.targetConsumerId = targetConsumerId;
156 // copy.referenceCount = referenceCount;
157 }
158
159 public Object getProperty(String name) throws IOException {
160 if (properties == null) {
161 if (marshalledProperties == null) {
162 return null;
163 }
164 properties = unmarsallProperties(marshalledProperties);
165 }
166 return properties.get(name);
167 }
168
169 @SuppressWarnings("unchecked")
170 public Map<String, Object> getProperties() throws IOException {
171 if (properties == null) {
172 if (marshalledProperties == null) {
173 return Collections.EMPTY_MAP;
174 }
175 properties = unmarsallProperties(marshalledProperties);
176 }
177 return Collections.unmodifiableMap(properties);
178 }
179
180 public void clearProperties() {
181 marshalledProperties = null;
182 properties = null;
183 }
184
185 public void setProperty(String name, Object value) throws IOException {
186 lazyCreateProperties();
187 properties.put(name, value);
188 }
189
190 public void removeProperty(String name) throws IOException {
191 lazyCreateProperties();
192 properties.remove(name);
193 }
194
195 protected void lazyCreateProperties() throws IOException {
196 if (properties == null) {
197 if (marshalledProperties == null) {
198 properties = new HashMap<String, Object>();
199 } else {
200 properties = unmarsallProperties(marshalledProperties);
201 marshalledProperties = null;
202 }
203 }
204 }
205
206 private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
207 return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
208 }
209
210 public void beforeMarshall(WireFormat wireFormat) throws IOException {
211 // Need to marshal the properties.
212 if (marshalledProperties == null && properties != null) {
213 ByteArrayOutputStream baos = new ByteArrayOutputStream();
214 DataOutputStream os = new DataOutputStream(baos);
215 MarshallingSupport.marshalPrimitiveMap(properties, os);
216 os.close();
217 marshalledProperties = baos.toByteSequence();
218 }
219 }
220
221 public void afterMarshall(WireFormat wireFormat) throws IOException {
222 }
223
224 public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
225 }
226
227 public void afterUnmarshall(WireFormat wireFormat) throws IOException {
228 }
229
230 // /////////////////////////////////////////////////////////////////
231 //
232 // Simple Field accessors
233 //
234 // /////////////////////////////////////////////////////////////////
235
236 /**
237 * @openwire:property version=1 cache=true
238 */
239 public ProducerId getProducerId() {
240 return producerId;
241 }
242
243 public void setProducerId(ProducerId producerId) {
244 this.producerId = producerId;
245 }
246
247 /**
248 * @openwire:property version=1 cache=true
249 */
250 public ActiveMQDestination getDestination() {
251 return destination;
252 }
253
254 public void setDestination(ActiveMQDestination destination) {
255 this.destination = destination;
256 }
257
258 /**
259 * @openwire:property version=1 cache=true
260 */
261 public TransactionId getTransactionId() {
262 return transactionId;
263 }
264
265 public void setTransactionId(TransactionId transactionId) {
266 this.transactionId = transactionId;
267 }
268
269 public boolean isInTransaction() {
270 return transactionId != null;
271 }
272
273 /**
274 * @openwire:property version=1 cache=true
275 */
276 public ActiveMQDestination getOriginalDestination() {
277 return originalDestination;
278 }
279
280 public void setOriginalDestination(ActiveMQDestination destination) {
281 this.originalDestination = destination;
282 }
283
284 /**
285 * @openwire:property version=1
286 */
287 public MessageId getMessageId() {
288 return messageId;
289 }
290
291 public void setMessageId(MessageId messageId) {
292 this.messageId = messageId;
293 }
294
295 /**
296 * @openwire:property version=1 cache=true
297 */
298 public TransactionId getOriginalTransactionId() {
299 return originalTransactionId;
300 }
301
302 public void setOriginalTransactionId(TransactionId transactionId) {
303 this.originalTransactionId = transactionId;
304 }
305
306 /**
307 * @openwire:property version=1
308 */
309 public String getGroupID() {
310 return groupID;
311 }
312
313 public void setGroupID(String groupID) {
314 this.groupID = groupID;
315 }
316
317 /**
318 * @openwire:property version=1
319 */
320 public int getGroupSequence() {
321 return groupSequence;
322 }
323
324 public void setGroupSequence(int groupSequence) {
325 this.groupSequence = groupSequence;
326 }
327
328 /**
329 * @openwire:property version=1
330 */
331 public String getCorrelationId() {
332 return correlationId;
333 }
334
335 public void setCorrelationId(String correlationId) {
336 this.correlationId = correlationId;
337 }
338
339 /**
340 * @openwire:property version=1
341 */
342 public boolean isPersistent() {
343 return persistent;
344 }
345
346 public void setPersistent(boolean deliveryMode) {
347 this.persistent = deliveryMode;
348 }
349
350 /**
351 * @openwire:property version=1
352 */
353 public long getExpiration() {
354 return expiration;
355 }
356
357 public void setExpiration(long expiration) {
358 this.expiration = expiration;
359 }
360
361 /**
362 * @openwire:property version=1
363 */
364 public byte getPriority() {
365 return priority;
366 }
367
368 public void setPriority(byte priority) {
369 if (priority < 0) {
370 this.priority = 0;
371 } else if (priority > 9) {
372 this.priority = 9;
373 } else {
374 this.priority = priority;
375 }
376 }
377
378 /**
379 * @openwire:property version=1
380 */
381 public ActiveMQDestination getReplyTo() {
382 return replyTo;
383 }
384
385 public void setReplyTo(ActiveMQDestination replyTo) {
386 this.replyTo = replyTo;
387 }
388
389 /**
390 * @openwire:property version=1
391 */
392 public long getTimestamp() {
393 return timestamp;
394 }
395
396 public void setTimestamp(long timestamp) {
397 this.timestamp = timestamp;
398 }
399
400 /**
401 * @openwire:property version=1
402 */
403 public String getType() {
404 return type;
405 }
406
407 public void setType(String type) {
408 this.type = type;
409 }
410
411 /**
412 * @openwire:property version=1
413 */
414 public ByteSequence getContent() {
415 return content;
416 }
417
418 public void setContent(ByteSequence content) {
419 this.content = content;
420 }
421
422 /**
423 * @openwire:property version=1
424 */
425 public ByteSequence getMarshalledProperties() {
426 return marshalledProperties;
427 }
428
429 public void setMarshalledProperties(ByteSequence marshalledProperties) {
430 this.marshalledProperties = marshalledProperties;
431 }
432
433 /**
434 * @openwire:property version=1
435 */
436 public DataStructure getDataStructure() {
437 return dataStructure;
438 }
439
440 public void setDataStructure(DataStructure data) {
441 this.dataStructure = data;
442 }
443
444 /**
445 * Can be used to route the message to a specific consumer. Should be null
446 * to allow the broker use normal JMS routing semantics. If the target
447 * consumer id is an active consumer on the broker, the message is dropped.
448 * Used by the AdvisoryBroker to replay advisory messages to a specific
449 * consumer.
450 *
451 * @openwire:property version=1 cache=true
452 */
453 public ConsumerId getTargetConsumerId() {
454 return targetConsumerId;
455 }
456
457 public void setTargetConsumerId(ConsumerId targetConsumerId) {
458 this.targetConsumerId = targetConsumerId;
459 }
460
461 public boolean isExpired() {
462 long expireTime = getExpiration();
463 return expireTime > 0 && System.currentTimeMillis() > expireTime;
464 }
465
466 public boolean isAdvisory() {
467 return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
468 }
469
470 /**
471 * @openwire:property version=1
472 */
473 public boolean isCompressed() {
474 return compressed;
475 }
476
477 public void setCompressed(boolean compressed) {
478 this.compressed = compressed;
479 }
480
481 public boolean isRedelivered() {
482 return redeliveryCounter > 0;
483 }
484
485 public void setRedelivered(boolean redelivered) {
486 if (redelivered) {
487 if (!isRedelivered()) {
488 setRedeliveryCounter(1);
489 }
490 } else {
491 if (isRedelivered()) {
492 setRedeliveryCounter(0);
493 }
494 }
495 }
496
497 public void incrementRedeliveryCounter() {
498 redeliveryCounter++;
499 }
500
501 /**
502 * @openwire:property version=1
503 */
504 public int getRedeliveryCounter() {
505 return redeliveryCounter;
506 }
507
508 public void setRedeliveryCounter(int deliveryCounter) {
509 this.redeliveryCounter = deliveryCounter;
510 }
511
512 /**
513 * The route of brokers the command has moved through.
514 *
515 * @openwire:property version=1 cache=true
516 */
517 public BrokerId[] getBrokerPath() {
518 return brokerPath;
519 }
520
521 public void setBrokerPath(BrokerId[] brokerPath) {
522 this.brokerPath = brokerPath;
523 }
524
525 public boolean isReadOnlyProperties() {
526 return readOnlyProperties;
527 }
528
529 public void setReadOnlyProperties(boolean readOnlyProperties) {
530 this.readOnlyProperties = readOnlyProperties;
531 }
532
533 public boolean isReadOnlyBody() {
534 return readOnlyBody;
535 }
536
537 public void setReadOnlyBody(boolean readOnlyBody) {
538 this.readOnlyBody = readOnlyBody;
539 }
540
541 public ActiveMQConnection getConnection() {
542 return this.connection;
543 }
544
545 public void setConnection(ActiveMQConnection connection) {
546 this.connection = connection;
547 }
548
549 /**
550 * Used to schedule the arrival time of a message to a broker. The broker
551 * will not dispatch a message to a consumer until it's arrival time has
552 * elapsed.
553 *
554 * @openwire:property version=1
555 */
556 public long getArrival() {
557 return arrival;
558 }
559
560 public void setArrival(long arrival) {
561 this.arrival = arrival;
562 }
563
564 /**
565 * Only set by the broker and defines the userID of the producer connection
566 * who sent this message. This is an optional field, it needs to be enabled
567 * on the broker to have this field populated.
568 *
569 * @openwire:property version=1
570 */
571 public String getUserID() {
572 return userID;
573 }
574
575 public void setUserID(String jmsxUserID) {
576 this.userID = jmsxUserID;
577 }
578
579 public int getReferenceCount() {
580 return referenceCount;
581 }
582
583 public Message getMessageHardRef() {
584 return this;
585 }
586
587 public Message getMessage() {
588 return this;
589 }
590
591 public org.apache.activemq.broker.region.Destination getRegionDestination() {
592 return regionDestination;
593 }
594
595 public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
596 this.regionDestination = destination;
597 if(this.memoryUsage==null) {
598 this.memoryUsage=regionDestination.getMemoryUsage();
599 }
600 }
601
602 public MemoryUsage getMemoryUsage() {
603 return this.memoryUsage;
604 }
605
606 public void setMemoryUsage(MemoryUsage usage) {
607 this.memoryUsage=usage;
608 }
609
610 @Override
611 public boolean isMarshallAware() {
612 return true;
613 }
614
615 public int incrementReferenceCount() {
616 int rc;
617 int size;
618 synchronized (this) {
619 rc = ++referenceCount;
620 size = getSize();
621 }
622
623 if (rc == 1 && getMemoryUsage() != null) {
624 getMemoryUsage().increaseUsage(size);
625 //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
626
627 }
628
629 //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
630 return rc;
631 }
632
633 public int decrementReferenceCount() {
634 int rc;
635 int size;
636 synchronized (this) {
637 rc = --referenceCount;
638 size = getSize();
639 }
640
641 if (rc == 0 && getMemoryUsage() != null) {
642 getMemoryUsage().decreaseUsage(size);
643 //Thread.dumpStack();
644 //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
645 }
646
647 //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
648
649 return rc;
650 }
651
652 public int getSize() {
653 int minimumMessageSize = getMinimumMessageSize();
654 if (size < minimumMessageSize || size == 0) {
655 size = minimumMessageSize;
656 if (marshalledProperties != null) {
657 size += marshalledProperties.getLength();
658 }
659 if (content != null) {
660 size += content.getLength();
661 }
662 }
663 return size;
664 }
665
666 protected int getMinimumMessageSize() {
667 int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
668 //let destination override
669 Destination dest = regionDestination;
670 if (dest != null) {
671 result=dest.getMinimumMessageSize();
672 }
673 return result;
674 }
675
676 /**
677 * @openwire:property version=1
678 * @return Returns the recievedByDFBridge.
679 */
680 public boolean isRecievedByDFBridge() {
681 return recievedByDFBridge;
682 }
683
684 /**
685 * @param recievedByDFBridge The recievedByDFBridge to set.
686 */
687 public void setRecievedByDFBridge(boolean recievedByDFBridge) {
688 this.recievedByDFBridge = recievedByDFBridge;
689 }
690
691 public void onMessageRolledBack() {
692 incrementRedeliveryCounter();
693 }
694
695 /**
696 * @openwire:property version=2 cache=true
697 */
698 public boolean isDroppable() {
699 return droppable;
700 }
701
702 public void setDroppable(boolean droppable) {
703 this.droppable = droppable;
704 }
705
706 /**
707 * If a message is stored in multiple nodes on a cluster, all the cluster
708 * members will be listed here. Otherwise, it will be null.
709 *
710 * @openwire:property version=3 cache=true
711 */
712 public BrokerId[] getCluster() {
713 return cluster;
714 }
715
716 public void setCluster(BrokerId[] cluster) {
717 this.cluster = cluster;
718 }
719
720 @Override
721 public boolean isMessage() {
722 return true;
723 }
724
725 /**
726 * @openwire:property version=3
727 */
728 public long getBrokerInTime() {
729 return this.brokerInTime;
730 }
731
732 public void setBrokerInTime(long brokerInTime) {
733 this.brokerInTime = brokerInTime;
734 }
735
736 /**
737 * @openwire:property version=3
738 */
739 public long getBrokerOutTime() {
740 return this.brokerOutTime;
741 }
742
743 public void setBrokerOutTime(long brokerOutTime) {
744 this.brokerOutTime = brokerOutTime;
745 }
746
747 public boolean isDropped() {
748 return false;
749 }
750
751 public void compress() throws IOException {
752 if (!isCompressed()) {
753 storeContent();
754 if (!isCompressed() && getContent() != null) {
755 doCompress();
756 }
757 }
758 }
759
760 protected void doCompress() throws IOException {
761 compressed = true;
762 ByteSequence bytes = getContent();
763 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
764 OutputStream os = new DeflaterOutputStream(bytesOut);
765 os.write(bytes.data, bytes.offset, bytes.length);
766 os.close();
767 setContent(bytesOut.toByteSequence());
768 }
769
770 @Override
771 public String toString() {
772 return toString(null);
773 }
774
775 @Override
776 public String toString(Map<String, Object>overrideFields) {
777 try {
778 getProperties();
779 } catch (IOException e) {
780 }
781 return super.toString(overrideFields);
782 }
783 }