001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.beans.Transient;
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.Map;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.zip.DeflaterOutputStream;
029
030import javax.jms.JMSException;
031
032import org.apache.activemq.ActiveMQConnection;
033import org.apache.activemq.advisory.AdvisorySupport;
034import org.apache.activemq.broker.region.MessageReference;
035import org.apache.activemq.usage.MemoryUsage;
036import org.apache.activemq.util.ByteArrayInputStream;
037import org.apache.activemq.util.ByteArrayOutputStream;
038import org.apache.activemq.util.ByteSequence;
039import org.apache.activemq.util.MarshallingSupport;
040import org.apache.activemq.wireformat.WireFormat;
041import org.fusesource.hawtbuf.UTF8Buffer;
042
043/**
044 * Represents an ActiveMQ message
045 *
046 * @openwire:marshaller
047 *
048 */
049public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
050    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
051
052    /**
053     * The default minimum amount of memory a message is assumed to use
054     */
055    public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
056
057    protected MessageId messageId;
058    protected ActiveMQDestination originalDestination;
059    protected TransactionId originalTransactionId;
060
061    protected ProducerId producerId;
062    protected ActiveMQDestination destination;
063    protected TransactionId transactionId;
064
065    protected long expiration;
066    protected long timestamp;
067    protected long arrival;
068    protected long brokerInTime;
069    protected long brokerOutTime;
070    protected String correlationId;
071    protected ActiveMQDestination replyTo;
072    protected boolean persistent;
073    protected String type;
074    protected byte priority;
075    protected String groupID;
076    protected int groupSequence;
077    protected ConsumerId targetConsumerId;
078    protected boolean compressed;
079    protected String userID;
080
081    protected ByteSequence content;
082    protected ByteSequence marshalledProperties;
083    protected DataStructure dataStructure;
084    protected int redeliveryCounter;
085
086    protected int size;
087    protected Map<String, Object> properties;
088    protected boolean readOnlyProperties;
089    protected boolean readOnlyBody;
090    protected transient boolean recievedByDFBridge;
091    protected boolean droppable;
092    protected boolean jmsXGroupFirstForConsumer;
093
094    private transient short referenceCount;
095    private transient ActiveMQConnection connection;
096    transient MessageDestination regionDestination;
097    transient MemoryUsage memoryUsage;
098    transient AtomicBoolean processAsExpired = new AtomicBoolean(false);
099
100    private BrokerId[] brokerPath;
101    private BrokerId[] cluster;
102
103    public static interface MessageDestination {
104        int getMinimumMessageSize();
105        MemoryUsage getMemoryUsage();
106    }
107
108    public abstract Message copy();
109    public abstract void clearBody() throws JMSException;
110    public abstract void storeContent();
111    public abstract void storeContentAndClear();
112
113    /**
114     * @deprecated - This method name is misnamed
115     * @throws JMSException
116     */
117    public void clearMarshalledState() throws JMSException {
118        clearUnMarshalledState();
119    }
120
121    // useful to reduce the memory footprint of a persisted message
122    public void clearUnMarshalledState() throws JMSException {
123        properties = null;
124    }
125
126    public boolean isMarshalled() {
127        return content != null && (marshalledProperties != null || properties == null);
128    }
129
130    protected void copy(Message copy) {
131        super.copy(copy);
132        copy.producerId = producerId;
133        copy.transactionId = transactionId;
134        copy.destination = destination;
135        copy.messageId = messageId != null ? messageId.copy() : null;
136        copy.originalDestination = originalDestination;
137        copy.originalTransactionId = originalTransactionId;
138        copy.expiration = expiration;
139        copy.timestamp = timestamp;
140        copy.correlationId = correlationId;
141        copy.replyTo = replyTo;
142        copy.persistent = persistent;
143        copy.redeliveryCounter = redeliveryCounter;
144        copy.type = type;
145        copy.priority = priority;
146        copy.size = size;
147        copy.groupID = groupID;
148        copy.userID = userID;
149        copy.groupSequence = groupSequence;
150
151        if (properties != null) {
152            copy.properties = new HashMap<String, Object>(properties);
153
154            // The new message hasn't expired, so remove this feild.
155            copy.properties.remove(ORIGINAL_EXPIRATION);
156        } else {
157            copy.properties = properties;
158        }
159
160        copy.content = copyByteSequence(content);
161        copy.marshalledProperties = copyByteSequence(marshalledProperties);
162        copy.dataStructure = dataStructure;
163        copy.readOnlyProperties = readOnlyProperties;
164        copy.readOnlyBody = readOnlyBody;
165        copy.compressed = compressed;
166        copy.recievedByDFBridge = recievedByDFBridge;
167
168        copy.arrival = arrival;
169        copy.connection = connection;
170        copy.regionDestination = regionDestination;
171        copy.brokerInTime = brokerInTime;
172        copy.brokerOutTime = brokerOutTime;
173        copy.memoryUsage=this.memoryUsage;
174        copy.brokerPath = brokerPath;
175        copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer;
176
177        // lets not copy the following fields
178        // copy.targetConsumerId = targetConsumerId;
179        // copy.referenceCount = referenceCount;
180    }
181
182    private ByteSequence copyByteSequence(ByteSequence content) {
183        if (content != null) {
184            return new ByteSequence(content.getData(), content.getOffset(), content.getLength());
185        }
186        return null;
187    }
188
189    public Object getProperty(String name) throws IOException {
190        if (properties == null) {
191            if (marshalledProperties == null) {
192                return null;
193            }
194            properties = unmarsallProperties(marshalledProperties);
195        }
196        Object result = properties.get(name);
197        if (result instanceof UTF8Buffer) {
198            result = result.toString();
199        }
200
201        return result;
202    }
203
204    @SuppressWarnings("unchecked")
205    public Map<String, Object> getProperties() throws IOException {
206        if (properties == null) {
207            if (marshalledProperties == null) {
208                return Collections.EMPTY_MAP;
209            }
210            properties = unmarsallProperties(marshalledProperties);
211        }
212        return Collections.unmodifiableMap(properties);
213    }
214
215    public void clearProperties() {
216        marshalledProperties = null;
217        properties = null;
218    }
219
220    public void setProperty(String name, Object value) throws IOException {
221        lazyCreateProperties();
222        properties.put(name, value);
223    }
224
225    public void removeProperty(String name) throws IOException {
226        lazyCreateProperties();
227        properties.remove(name);
228    }
229
230    protected void lazyCreateProperties() throws IOException {
231        if (properties == null) {
232            if (marshalledProperties == null) {
233                properties = new HashMap<String, Object>();
234            } else {
235                properties = unmarsallProperties(marshalledProperties);
236                marshalledProperties = null;
237            }
238        } else {
239            marshalledProperties = null;
240        }
241    }
242
243    private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
244        return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
245    }
246
247    @Override
248        public void beforeMarshall(WireFormat wireFormat) throws IOException {
249        // Need to marshal the properties.
250        if (marshalledProperties == null && properties != null) {
251            ByteArrayOutputStream baos = new ByteArrayOutputStream();
252            DataOutputStream os = new DataOutputStream(baos);
253            MarshallingSupport.marshalPrimitiveMap(properties, os);
254            os.close();
255            marshalledProperties = baos.toByteSequence();
256        }
257    }
258
259    @Override
260        public void afterMarshall(WireFormat wireFormat) throws IOException {
261    }
262
263    @Override
264        public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
265    }
266
267    @Override
268        public void afterUnmarshall(WireFormat wireFormat) throws IOException {
269    }
270
271    // /////////////////////////////////////////////////////////////////
272    //
273    // Simple Field accessors
274    //
275    // /////////////////////////////////////////////////////////////////
276
277    /**
278     * @openwire:property version=1 cache=true
279     */
280    public ProducerId getProducerId() {
281        return producerId;
282    }
283
284    public void setProducerId(ProducerId producerId) {
285        this.producerId = producerId;
286    }
287
288    /**
289     * @openwire:property version=1 cache=true
290     */
291    public ActiveMQDestination getDestination() {
292        return destination;
293    }
294
295    public void setDestination(ActiveMQDestination destination) {
296        this.destination = destination;
297    }
298
299    /**
300     * @openwire:property version=1 cache=true
301     */
302    public TransactionId getTransactionId() {
303        return transactionId;
304    }
305
306    public void setTransactionId(TransactionId transactionId) {
307        this.transactionId = transactionId;
308    }
309
310    public boolean isInTransaction() {
311        return transactionId != null;
312    }
313
314    /**
315     * @openwire:property version=1 cache=true
316     */
317    public ActiveMQDestination getOriginalDestination() {
318        return originalDestination;
319    }
320
321    public void setOriginalDestination(ActiveMQDestination destination) {
322        this.originalDestination = destination;
323    }
324
325    /**
326     * @openwire:property version=1
327     */
328    @Override
329        public MessageId getMessageId() {
330        return messageId;
331    }
332
333    public void setMessageId(MessageId messageId) {
334        this.messageId = messageId;
335    }
336
337    /**
338     * @openwire:property version=1 cache=true
339     */
340    public TransactionId getOriginalTransactionId() {
341        return originalTransactionId;
342    }
343
344    public void setOriginalTransactionId(TransactionId transactionId) {
345        this.originalTransactionId = transactionId;
346    }
347
348    /**
349     * @openwire:property version=1
350     */
351    @Override
352        public String getGroupID() {
353        return groupID;
354    }
355
356    public void setGroupID(String groupID) {
357        this.groupID = groupID;
358    }
359
360    /**
361     * @openwire:property version=1
362     */
363    @Override
364        public int getGroupSequence() {
365        return groupSequence;
366    }
367
368    public void setGroupSequence(int groupSequence) {
369        this.groupSequence = groupSequence;
370    }
371
372    /**
373     * @openwire:property version=1
374     */
375    public String getCorrelationId() {
376        return correlationId;
377    }
378
379    public void setCorrelationId(String correlationId) {
380        this.correlationId = correlationId;
381    }
382
383    /**
384     * @openwire:property version=1
385     */
386    @Override
387        public boolean isPersistent() {
388        return persistent;
389    }
390
391    public void setPersistent(boolean deliveryMode) {
392        this.persistent = deliveryMode;
393    }
394
395    /**
396     * @openwire:property version=1
397     */
398    @Override
399        public long getExpiration() {
400        return expiration;
401    }
402
403    public void setExpiration(long expiration) {
404        this.expiration = expiration;
405    }
406
407    /**
408     * @openwire:property version=1
409     */
410    public byte getPriority() {
411        return priority;
412    }
413
414    public void setPriority(byte priority) {
415        if (priority < 0) {
416            this.priority = 0;
417        } else if (priority > 9) {
418            this.priority = 9;
419        } else {
420            this.priority = priority;
421        }
422    }
423
424    /**
425     * @openwire:property version=1
426     */
427    public ActiveMQDestination getReplyTo() {
428        return replyTo;
429    }
430
431    public void setReplyTo(ActiveMQDestination replyTo) {
432        this.replyTo = replyTo;
433    }
434
435    /**
436     * @openwire:property version=1
437     */
438    public long getTimestamp() {
439        return timestamp;
440    }
441
442    public void setTimestamp(long timestamp) {
443        this.timestamp = timestamp;
444    }
445
446    /**
447     * @openwire:property version=1
448     */
449    public String getType() {
450        return type;
451    }
452
453    public void setType(String type) {
454        this.type = type;
455    }
456
457    /**
458     * @openwire:property version=1
459     */
460    public ByteSequence getContent() {
461        return content;
462    }
463
464    public void setContent(ByteSequence content) {
465        this.content = content;
466    }
467
468    /**
469     * @openwire:property version=1
470     */
471    public ByteSequence getMarshalledProperties() {
472        return marshalledProperties;
473    }
474
475    public void setMarshalledProperties(ByteSequence marshalledProperties) {
476        this.marshalledProperties = marshalledProperties;
477    }
478
479    /**
480     * @openwire:property version=1
481     */
482    public DataStructure getDataStructure() {
483        return dataStructure;
484    }
485
486    public void setDataStructure(DataStructure data) {
487        this.dataStructure = data;
488    }
489
490    /**
491     * Can be used to route the message to a specific consumer. Should be null
492     * to allow the broker use normal JMS routing semantics. If the target
493     * consumer id is an active consumer on the broker, the message is dropped.
494     * Used by the AdvisoryBroker to replay advisory messages to a specific
495     * consumer.
496     *
497     * @openwire:property version=1 cache=true
498     */
499    @Override
500        public ConsumerId getTargetConsumerId() {
501        return targetConsumerId;
502    }
503
504    public void setTargetConsumerId(ConsumerId targetConsumerId) {
505        this.targetConsumerId = targetConsumerId;
506    }
507
508    @Override
509        public boolean isExpired() {
510        long expireTime = getExpiration();
511        return expireTime > 0 && System.currentTimeMillis() > expireTime;
512    }
513
514    @Override
515        public boolean isAdvisory() {
516        return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
517    }
518
519    /**
520     * @openwire:property version=1
521     */
522    public boolean isCompressed() {
523        return compressed;
524    }
525
526    public void setCompressed(boolean compressed) {
527        this.compressed = compressed;
528    }
529
530    public boolean isRedelivered() {
531        return redeliveryCounter > 0;
532    }
533
534    public void setRedelivered(boolean redelivered) {
535        if (redelivered) {
536            if (!isRedelivered()) {
537                setRedeliveryCounter(1);
538            }
539        } else {
540            if (isRedelivered()) {
541                setRedeliveryCounter(0);
542            }
543        }
544    }
545
546    @Override
547        public void incrementRedeliveryCounter() {
548        redeliveryCounter++;
549    }
550
551    /**
552     * @openwire:property version=1
553     */
554    @Override
555        public int getRedeliveryCounter() {
556        return redeliveryCounter;
557    }
558
559    public void setRedeliveryCounter(int deliveryCounter) {
560        this.redeliveryCounter = deliveryCounter;
561    }
562
563    /**
564     * The route of brokers the command has moved through.
565     *
566     * @openwire:property version=1 cache=true
567     */
568    public BrokerId[] getBrokerPath() {
569        return brokerPath;
570    }
571
572    public void setBrokerPath(BrokerId[] brokerPath) {
573        this.brokerPath = brokerPath;
574    }
575
576    public boolean isReadOnlyProperties() {
577        return readOnlyProperties;
578    }
579
580    public void setReadOnlyProperties(boolean readOnlyProperties) {
581        this.readOnlyProperties = readOnlyProperties;
582    }
583
584    public boolean isReadOnlyBody() {
585        return readOnlyBody;
586    }
587
588    public void setReadOnlyBody(boolean readOnlyBody) {
589        this.readOnlyBody = readOnlyBody;
590    }
591
592    public ActiveMQConnection getConnection() {
593        return this.connection;
594    }
595
596    public void setConnection(ActiveMQConnection connection) {
597        this.connection = connection;
598    }
599
600    /**
601     * Used to schedule the arrival time of a message to a broker. The broker
602     * will not dispatch a message to a consumer until it's arrival time has
603     * elapsed.
604     *
605     * @openwire:property version=1
606     */
607    public long getArrival() {
608        return arrival;
609    }
610
611    public void setArrival(long arrival) {
612        this.arrival = arrival;
613    }
614
615    /**
616     * Only set by the broker and defines the userID of the producer connection
617     * who sent this message. This is an optional field, it needs to be enabled
618     * on the broker to have this field populated.
619     *
620     * @openwire:property version=1
621     */
622    public String getUserID() {
623        return userID;
624    }
625
626    public void setUserID(String jmsxUserID) {
627        this.userID = jmsxUserID;
628    }
629
630    @Override
631        public int getReferenceCount() {
632        return referenceCount;
633    }
634
635    @Override
636        public Message getMessageHardRef() {
637        return this;
638    }
639
640    @Override
641        public Message getMessage() {
642        return this;
643    }
644
645    public void setRegionDestination(MessageDestination destination) {
646        this.regionDestination = destination;
647        if(this.memoryUsage==null) {
648            this.memoryUsage=destination.getMemoryUsage();
649        }
650    }
651
652    @Override
653    @Transient
654        public MessageDestination getRegionDestination() {
655        return regionDestination;
656    }
657
658    public MemoryUsage getMemoryUsage() {
659        return this.memoryUsage;
660    }
661
662    public void setMemoryUsage(MemoryUsage usage) {
663        this.memoryUsage=usage;
664    }
665
666    @Override
667    public boolean isMarshallAware() {
668        return true;
669    }
670
671    @Override
672        public int incrementReferenceCount() {
673        int rc;
674        int size;
675        synchronized (this) {
676            rc = ++referenceCount;
677            size = getSize();
678        }
679
680        if (rc == 1 && getMemoryUsage() != null) {
681            getMemoryUsage().increaseUsage(size);
682            //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
683
684        }
685
686        //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
687        return rc;
688    }
689
690    @Override
691        public int decrementReferenceCount() {
692        int rc;
693        int size;
694        synchronized (this) {
695            rc = --referenceCount;
696            size = getSize();
697        }
698
699        if (rc == 0 && getMemoryUsage() != null) {
700            getMemoryUsage().decreaseUsage(size);
701            //Thread.dumpStack();
702            //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
703        }
704
705        //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
706
707        return rc;
708    }
709
710    @Override
711        public int getSize() {
712        int minimumMessageSize = getMinimumMessageSize();
713        if (size < minimumMessageSize || size == 0) {
714            size = minimumMessageSize;
715            if (marshalledProperties != null) {
716                size += marshalledProperties.getLength();
717            }
718            if (content != null) {
719                size += content.getLength();
720            }
721        }
722        return size;
723    }
724
725    protected int getMinimumMessageSize() {
726        int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
727        //let destination override
728        MessageDestination dest = regionDestination;
729        if (dest != null) {
730            result=dest.getMinimumMessageSize();
731        }
732        return result;
733    }
734
735    /**
736     * @openwire:property version=1
737     * @return Returns the recievedByDFBridge.
738     */
739    public boolean isRecievedByDFBridge() {
740        return recievedByDFBridge;
741    }
742
743    /**
744     * @param recievedByDFBridge The recievedByDFBridge to set.
745     */
746    public void setRecievedByDFBridge(boolean recievedByDFBridge) {
747        this.recievedByDFBridge = recievedByDFBridge;
748    }
749
750    public void onMessageRolledBack() {
751        incrementRedeliveryCounter();
752    }
753
754    /**
755     * @openwire:property version=2 cache=true
756     */
757    public boolean isDroppable() {
758        return droppable;
759    }
760
761    public void setDroppable(boolean droppable) {
762        this.droppable = droppable;
763    }
764
765    /**
766     * If a message is stored in multiple nodes on a cluster, all the cluster
767     * members will be listed here. Otherwise, it will be null.
768     *
769     * @openwire:property version=3 cache=true
770     */
771    public BrokerId[] getCluster() {
772        return cluster;
773    }
774
775    public void setCluster(BrokerId[] cluster) {
776        this.cluster = cluster;
777    }
778
779    @Override
780    public boolean isMessage() {
781        return true;
782    }
783
784    /**
785     * @openwire:property version=3
786     */
787    public long getBrokerInTime() {
788        return this.brokerInTime;
789    }
790
791    public void setBrokerInTime(long brokerInTime) {
792        this.brokerInTime = brokerInTime;
793    }
794
795    /**
796     * @openwire:property version=3
797     */
798    public long getBrokerOutTime() {
799        return this.brokerOutTime;
800    }
801
802    public void setBrokerOutTime(long brokerOutTime) {
803        this.brokerOutTime = brokerOutTime;
804    }
805
806    @Override
807        public boolean isDropped() {
808        return false;
809    }
810
811    /**
812     * @openwire:property version=10
813     */
814    public boolean isJMSXGroupFirstForConsumer() {
815        return jmsXGroupFirstForConsumer;
816    }
817
818    public void setJMSXGroupFirstForConsumer(boolean val) {
819        jmsXGroupFirstForConsumer = val;
820    }
821
822    public void compress() throws IOException {
823        if (!isCompressed()) {
824            storeContent();
825            if (!isCompressed() && getContent() != null) {
826                doCompress();
827            }
828        }
829    }
830
831    protected void doCompress() throws IOException {
832        compressed = true;
833        ByteSequence bytes = getContent();
834        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
835        OutputStream os = new DeflaterOutputStream(bytesOut);
836        os.write(bytes.data, bytes.offset, bytes.length);
837        os.close();
838        setContent(bytesOut.toByteSequence());
839    }
840
841    @Override
842    public String toString() {
843        return toString(null);
844    }
845
846    @Override
847    public String toString(Map<String, Object>overrideFields) {
848        try {
849            getProperties();
850        } catch (IOException e) {
851        }
852        return super.toString(overrideFields);
853    }
854
855    @Override
856    public boolean canProcessAsExpired() {
857        return processAsExpired.compareAndSet(false, true);
858    }
859}