001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.beans.Transient;
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.io.IOException;
023import java.io.OutputStream;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.Map;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.zip.DeflaterOutputStream;
029
030import javax.jms.JMSException;
031
032import org.apache.activemq.ActiveMQConnection;
033import org.apache.activemq.advisory.AdvisorySupport;
034import org.apache.activemq.broker.region.MessageReference;
035import org.apache.activemq.usage.MemoryUsage;
036import org.apache.activemq.util.ByteArrayInputStream;
037import org.apache.activemq.util.ByteArrayOutputStream;
038import org.apache.activemq.util.ByteSequence;
039import org.apache.activemq.util.MarshallingSupport;
040import org.apache.activemq.wireformat.WireFormat;
041import org.fusesource.hawtbuf.UTF8Buffer;
042
043/**
044 * Represents an ActiveMQ message
045 *
046 * @openwire:marshaller
047 *
048 */
049public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
050    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
051
052    /**
053     * The default minimum amount of memory a message is assumed to use
054     */
055    public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
056
057    protected MessageId messageId;
058    protected ActiveMQDestination originalDestination;
059    protected TransactionId originalTransactionId;
060
061    protected ProducerId producerId;
062    protected ActiveMQDestination destination;
063    protected TransactionId transactionId;
064
065    protected long expiration;
066    protected long timestamp;
067    protected long arrival;
068    protected long brokerInTime;
069    protected long brokerOutTime;
070    protected String correlationId;
071    protected ActiveMQDestination replyTo;
072    protected boolean persistent;
073    protected String type;
074    protected byte priority;
075    protected String groupID;
076    protected int groupSequence;
077    protected ConsumerId targetConsumerId;
078    protected boolean compressed;
079    protected String userID;
080
081    protected ByteSequence content;
082    protected ByteSequence marshalledProperties;
083    protected DataStructure dataStructure;
084    protected int redeliveryCounter;
085
086    protected int size;
087    protected Map<String, Object> properties;
088    protected boolean readOnlyProperties;
089    protected boolean readOnlyBody;
090    protected transient boolean recievedByDFBridge;
091    protected boolean droppable;
092    protected boolean jmsXGroupFirstForConsumer;
093
094    private transient short referenceCount;
095    private transient ActiveMQConnection connection;
096    transient MessageDestination regionDestination;
097    transient MemoryUsage memoryUsage;
098    transient AtomicBoolean processAsExpired = new AtomicBoolean(false);
099
100    private BrokerId[] brokerPath;
101    private BrokerId[] cluster;
102
103    public static interface MessageDestination {
104        int getMinimumMessageSize();
105        MemoryUsage getMemoryUsage();
106    }
107
108    public abstract Message copy();
109    public abstract void clearBody() throws JMSException;
110    public abstract void storeContent();
111    public abstract void storeContentAndClear();
112
113    // useful to reduce the memory footprint of a persisted message
114    public void clearMarshalledState() throws JMSException {
115        properties = null;
116    }
117
118    protected void copy(Message copy) {
119        super.copy(copy);
120        copy.producerId = producerId;
121        copy.transactionId = transactionId;
122        copy.destination = destination;
123        copy.messageId = messageId != null ? messageId.copy() : null;
124        copy.originalDestination = originalDestination;
125        copy.originalTransactionId = originalTransactionId;
126        copy.expiration = expiration;
127        copy.timestamp = timestamp;
128        copy.correlationId = correlationId;
129        copy.replyTo = replyTo;
130        copy.persistent = persistent;
131        copy.redeliveryCounter = redeliveryCounter;
132        copy.type = type;
133        copy.priority = priority;
134        copy.size = size;
135        copy.groupID = groupID;
136        copy.userID = userID;
137        copy.groupSequence = groupSequence;
138
139        if (properties != null) {
140            copy.properties = new HashMap<String, Object>(properties);
141
142            // The new message hasn't expired, so remove this feild.
143            copy.properties.remove(ORIGINAL_EXPIRATION);
144        } else {
145            copy.properties = properties;
146        }
147
148        copy.content = copyByteSequence(content);
149        copy.marshalledProperties = copyByteSequence(marshalledProperties);
150        copy.dataStructure = dataStructure;
151        copy.readOnlyProperties = readOnlyProperties;
152        copy.readOnlyBody = readOnlyBody;
153        copy.compressed = compressed;
154        copy.recievedByDFBridge = recievedByDFBridge;
155
156        copy.arrival = arrival;
157        copy.connection = connection;
158        copy.regionDestination = regionDestination;
159        copy.brokerInTime = brokerInTime;
160        copy.brokerOutTime = brokerOutTime;
161        copy.memoryUsage=this.memoryUsage;
162        copy.brokerPath = brokerPath;
163        copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer;
164
165        // lets not copy the following fields
166        // copy.targetConsumerId = targetConsumerId;
167        // copy.referenceCount = referenceCount;
168    }
169
170    private ByteSequence copyByteSequence(ByteSequence content) {
171        if (content != null) {
172            return new ByteSequence(content.getData(), content.getOffset(), content.getLength());
173        }
174        return null;
175    }
176
177    public Object getProperty(String name) throws IOException {
178        if (properties == null) {
179            if (marshalledProperties == null) {
180                return null;
181            }
182            properties = unmarsallProperties(marshalledProperties);
183        }
184        Object result = properties.get(name);
185        if (result instanceof UTF8Buffer) {
186            result = result.toString();
187        }
188
189        return result;
190    }
191
192    @SuppressWarnings("unchecked")
193    public Map<String, Object> getProperties() throws IOException {
194        if (properties == null) {
195            if (marshalledProperties == null) {
196                return Collections.EMPTY_MAP;
197            }
198            properties = unmarsallProperties(marshalledProperties);
199        }
200        return Collections.unmodifiableMap(properties);
201    }
202
203    public void clearProperties() {
204        marshalledProperties = null;
205        properties = null;
206    }
207
208    public void setProperty(String name, Object value) throws IOException {
209        lazyCreateProperties();
210        properties.put(name, value);
211    }
212
213    public void removeProperty(String name) throws IOException {
214        lazyCreateProperties();
215        properties.remove(name);
216    }
217
218    protected void lazyCreateProperties() throws IOException {
219        if (properties == null) {
220            if (marshalledProperties == null) {
221                properties = new HashMap<String, Object>();
222            } else {
223                properties = unmarsallProperties(marshalledProperties);
224                marshalledProperties = null;
225            }
226        } else {
227            marshalledProperties = null;
228        }
229    }
230
231    private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
232        return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
233    }
234
235    @Override
236        public void beforeMarshall(WireFormat wireFormat) throws IOException {
237        // Need to marshal the properties.
238        if (marshalledProperties == null && properties != null) {
239            ByteArrayOutputStream baos = new ByteArrayOutputStream();
240            DataOutputStream os = new DataOutputStream(baos);
241            MarshallingSupport.marshalPrimitiveMap(properties, os);
242            os.close();
243            marshalledProperties = baos.toByteSequence();
244        }
245    }
246
247    @Override
248        public void afterMarshall(WireFormat wireFormat) throws IOException {
249    }
250
251    @Override
252        public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
253    }
254
255    @Override
256        public void afterUnmarshall(WireFormat wireFormat) throws IOException {
257    }
258
259    // /////////////////////////////////////////////////////////////////
260    //
261    // Simple Field accessors
262    //
263    // /////////////////////////////////////////////////////////////////
264
265    /**
266     * @openwire:property version=1 cache=true
267     */
268    public ProducerId getProducerId() {
269        return producerId;
270    }
271
272    public void setProducerId(ProducerId producerId) {
273        this.producerId = producerId;
274    }
275
276    /**
277     * @openwire:property version=1 cache=true
278     */
279    public ActiveMQDestination getDestination() {
280        return destination;
281    }
282
283    public void setDestination(ActiveMQDestination destination) {
284        this.destination = destination;
285    }
286
287    /**
288     * @openwire:property version=1 cache=true
289     */
290    public TransactionId getTransactionId() {
291        return transactionId;
292    }
293
294    public void setTransactionId(TransactionId transactionId) {
295        this.transactionId = transactionId;
296    }
297
298    public boolean isInTransaction() {
299        return transactionId != null;
300    }
301
302    /**
303     * @openwire:property version=1 cache=true
304     */
305    public ActiveMQDestination getOriginalDestination() {
306        return originalDestination;
307    }
308
309    public void setOriginalDestination(ActiveMQDestination destination) {
310        this.originalDestination = destination;
311    }
312
313    /**
314     * @openwire:property version=1
315     */
316    @Override
317        public MessageId getMessageId() {
318        return messageId;
319    }
320
321    public void setMessageId(MessageId messageId) {
322        this.messageId = messageId;
323    }
324
325    /**
326     * @openwire:property version=1 cache=true
327     */
328    public TransactionId getOriginalTransactionId() {
329        return originalTransactionId;
330    }
331
332    public void setOriginalTransactionId(TransactionId transactionId) {
333        this.originalTransactionId = transactionId;
334    }
335
336    /**
337     * @openwire:property version=1
338     */
339    @Override
340        public String getGroupID() {
341        return groupID;
342    }
343
344    public void setGroupID(String groupID) {
345        this.groupID = groupID;
346    }
347
348    /**
349     * @openwire:property version=1
350     */
351    @Override
352        public int getGroupSequence() {
353        return groupSequence;
354    }
355
356    public void setGroupSequence(int groupSequence) {
357        this.groupSequence = groupSequence;
358    }
359
360    /**
361     * @openwire:property version=1
362     */
363    public String getCorrelationId() {
364        return correlationId;
365    }
366
367    public void setCorrelationId(String correlationId) {
368        this.correlationId = correlationId;
369    }
370
371    /**
372     * @openwire:property version=1
373     */
374    @Override
375        public boolean isPersistent() {
376        return persistent;
377    }
378
379    public void setPersistent(boolean deliveryMode) {
380        this.persistent = deliveryMode;
381    }
382
383    /**
384     * @openwire:property version=1
385     */
386    @Override
387        public long getExpiration() {
388        return expiration;
389    }
390
391    public void setExpiration(long expiration) {
392        this.expiration = expiration;
393    }
394
395    /**
396     * @openwire:property version=1
397     */
398    public byte getPriority() {
399        return priority;
400    }
401
402    public void setPriority(byte priority) {
403        if (priority < 0) {
404            this.priority = 0;
405        } else if (priority > 9) {
406            this.priority = 9;
407        } else {
408            this.priority = priority;
409        }
410    }
411
412    /**
413     * @openwire:property version=1
414     */
415    public ActiveMQDestination getReplyTo() {
416        return replyTo;
417    }
418
419    public void setReplyTo(ActiveMQDestination replyTo) {
420        this.replyTo = replyTo;
421    }
422
423    /**
424     * @openwire:property version=1
425     */
426    public long getTimestamp() {
427        return timestamp;
428    }
429
430    public void setTimestamp(long timestamp) {
431        this.timestamp = timestamp;
432    }
433
434    /**
435     * @openwire:property version=1
436     */
437    public String getType() {
438        return type;
439    }
440
441    public void setType(String type) {
442        this.type = type;
443    }
444
445    /**
446     * @openwire:property version=1
447     */
448    public ByteSequence getContent() {
449        return content;
450    }
451
452    public void setContent(ByteSequence content) {
453        this.content = content;
454    }
455
456    /**
457     * @openwire:property version=1
458     */
459    public ByteSequence getMarshalledProperties() {
460        return marshalledProperties;
461    }
462
463    public void setMarshalledProperties(ByteSequence marshalledProperties) {
464        this.marshalledProperties = marshalledProperties;
465    }
466
467    /**
468     * @openwire:property version=1
469     */
470    public DataStructure getDataStructure() {
471        return dataStructure;
472    }
473
474    public void setDataStructure(DataStructure data) {
475        this.dataStructure = data;
476    }
477
478    /**
479     * Can be used to route the message to a specific consumer. Should be null
480     * to allow the broker use normal JMS routing semantics. If the target
481     * consumer id is an active consumer on the broker, the message is dropped.
482     * Used by the AdvisoryBroker to replay advisory messages to a specific
483     * consumer.
484     *
485     * @openwire:property version=1 cache=true
486     */
487    @Override
488        public ConsumerId getTargetConsumerId() {
489        return targetConsumerId;
490    }
491
492    public void setTargetConsumerId(ConsumerId targetConsumerId) {
493        this.targetConsumerId = targetConsumerId;
494    }
495
496    @Override
497        public boolean isExpired() {
498        long expireTime = getExpiration();
499        return expireTime > 0 && System.currentTimeMillis() > expireTime;
500    }
501
502    @Override
503        public boolean isAdvisory() {
504        return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
505    }
506
507    /**
508     * @openwire:property version=1
509     */
510    public boolean isCompressed() {
511        return compressed;
512    }
513
514    public void setCompressed(boolean compressed) {
515        this.compressed = compressed;
516    }
517
518    public boolean isRedelivered() {
519        return redeliveryCounter > 0;
520    }
521
522    public void setRedelivered(boolean redelivered) {
523        if (redelivered) {
524            if (!isRedelivered()) {
525                setRedeliveryCounter(1);
526            }
527        } else {
528            if (isRedelivered()) {
529                setRedeliveryCounter(0);
530            }
531        }
532    }
533
534    @Override
535        public void incrementRedeliveryCounter() {
536        redeliveryCounter++;
537    }
538
539    /**
540     * @openwire:property version=1
541     */
542    @Override
543        public int getRedeliveryCounter() {
544        return redeliveryCounter;
545    }
546
547    public void setRedeliveryCounter(int deliveryCounter) {
548        this.redeliveryCounter = deliveryCounter;
549    }
550
551    /**
552     * The route of brokers the command has moved through.
553     *
554     * @openwire:property version=1 cache=true
555     */
556    public BrokerId[] getBrokerPath() {
557        return brokerPath;
558    }
559
560    public void setBrokerPath(BrokerId[] brokerPath) {
561        this.brokerPath = brokerPath;
562    }
563
564    public boolean isReadOnlyProperties() {
565        return readOnlyProperties;
566    }
567
568    public void setReadOnlyProperties(boolean readOnlyProperties) {
569        this.readOnlyProperties = readOnlyProperties;
570    }
571
572    public boolean isReadOnlyBody() {
573        return readOnlyBody;
574    }
575
576    public void setReadOnlyBody(boolean readOnlyBody) {
577        this.readOnlyBody = readOnlyBody;
578    }
579
580    public ActiveMQConnection getConnection() {
581        return this.connection;
582    }
583
584    public void setConnection(ActiveMQConnection connection) {
585        this.connection = connection;
586    }
587
588    /**
589     * Used to schedule the arrival time of a message to a broker. The broker
590     * will not dispatch a message to a consumer until it's arrival time has
591     * elapsed.
592     *
593     * @openwire:property version=1
594     */
595    public long getArrival() {
596        return arrival;
597    }
598
599    public void setArrival(long arrival) {
600        this.arrival = arrival;
601    }
602
603    /**
604     * Only set by the broker and defines the userID of the producer connection
605     * who sent this message. This is an optional field, it needs to be enabled
606     * on the broker to have this field populated.
607     *
608     * @openwire:property version=1
609     */
610    public String getUserID() {
611        return userID;
612    }
613
614    public void setUserID(String jmsxUserID) {
615        this.userID = jmsxUserID;
616    }
617
618    @Override
619        public int getReferenceCount() {
620        return referenceCount;
621    }
622
623    @Override
624        public Message getMessageHardRef() {
625        return this;
626    }
627
628    @Override
629        public Message getMessage() {
630        return this;
631    }
632
633    public void setRegionDestination(MessageDestination destination) {
634        this.regionDestination = destination;
635        if(this.memoryUsage==null) {
636            this.memoryUsage=destination.getMemoryUsage();
637        }
638    }
639
640    @Override
641    @Transient
642        public MessageDestination getRegionDestination() {
643        return regionDestination;
644    }
645
646    public MemoryUsage getMemoryUsage() {
647        return this.memoryUsage;
648    }
649
650    public void setMemoryUsage(MemoryUsage usage) {
651        this.memoryUsage=usage;
652    }
653
654    @Override
655    public boolean isMarshallAware() {
656        return true;
657    }
658
659    @Override
660        public int incrementReferenceCount() {
661        int rc;
662        int size;
663        synchronized (this) {
664            rc = ++referenceCount;
665            size = getSize();
666        }
667
668        if (rc == 1 && getMemoryUsage() != null) {
669            getMemoryUsage().increaseUsage(size);
670            //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
671
672        }
673
674        //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
675        return rc;
676    }
677
678    @Override
679        public int decrementReferenceCount() {
680        int rc;
681        int size;
682        synchronized (this) {
683            rc = --referenceCount;
684            size = getSize();
685        }
686
687        if (rc == 0 && getMemoryUsage() != null) {
688            getMemoryUsage().decreaseUsage(size);
689            //Thread.dumpStack();
690            //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
691        }
692
693        //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
694
695        return rc;
696    }
697
698    @Override
699        public int getSize() {
700        int minimumMessageSize = getMinimumMessageSize();
701        if (size < minimumMessageSize || size == 0) {
702            size = minimumMessageSize;
703            if (marshalledProperties != null) {
704                size += marshalledProperties.getLength();
705            }
706            if (content != null) {
707                size += content.getLength();
708            }
709        }
710        return size;
711    }
712
713    protected int getMinimumMessageSize() {
714        int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
715        //let destination override
716        MessageDestination dest = regionDestination;
717        if (dest != null) {
718            result=dest.getMinimumMessageSize();
719        }
720        return result;
721    }
722
723    /**
724     * @openwire:property version=1
725     * @return Returns the recievedByDFBridge.
726     */
727    public boolean isRecievedByDFBridge() {
728        return recievedByDFBridge;
729    }
730
731    /**
732     * @param recievedByDFBridge The recievedByDFBridge to set.
733     */
734    public void setRecievedByDFBridge(boolean recievedByDFBridge) {
735        this.recievedByDFBridge = recievedByDFBridge;
736    }
737
738    public void onMessageRolledBack() {
739        incrementRedeliveryCounter();
740    }
741
742    /**
743     * @openwire:property version=2 cache=true
744     */
745    public boolean isDroppable() {
746        return droppable;
747    }
748
749    public void setDroppable(boolean droppable) {
750        this.droppable = droppable;
751    }
752
753    /**
754     * If a message is stored in multiple nodes on a cluster, all the cluster
755     * members will be listed here. Otherwise, it will be null.
756     *
757     * @openwire:property version=3 cache=true
758     */
759    public BrokerId[] getCluster() {
760        return cluster;
761    }
762
763    public void setCluster(BrokerId[] cluster) {
764        this.cluster = cluster;
765    }
766
767    @Override
768    public boolean isMessage() {
769        return true;
770    }
771
772    /**
773     * @openwire:property version=3
774     */
775    public long getBrokerInTime() {
776        return this.brokerInTime;
777    }
778
779    public void setBrokerInTime(long brokerInTime) {
780        this.brokerInTime = brokerInTime;
781    }
782
783    /**
784     * @openwire:property version=3
785     */
786    public long getBrokerOutTime() {
787        return this.brokerOutTime;
788    }
789
790    public void setBrokerOutTime(long brokerOutTime) {
791        this.brokerOutTime = brokerOutTime;
792    }
793
794    @Override
795        public boolean isDropped() {
796        return false;
797    }
798
799    /**
800     * @openwire:property version=10
801     */
802    public boolean isJMSXGroupFirstForConsumer() {
803        return jmsXGroupFirstForConsumer;
804    }
805
806    public void setJMSXGroupFirstForConsumer(boolean val) {
807        jmsXGroupFirstForConsumer = val;
808    }
809
810    public void compress() throws IOException {
811        if (!isCompressed()) {
812            storeContent();
813            if (!isCompressed() && getContent() != null) {
814                doCompress();
815            }
816        }
817    }
818
819    protected void doCompress() throws IOException {
820        compressed = true;
821        ByteSequence bytes = getContent();
822        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
823        OutputStream os = new DeflaterOutputStream(bytesOut);
824        os.write(bytes.data, bytes.offset, bytes.length);
825        os.close();
826        setContent(bytesOut.toByteSequence());
827    }
828
829    @Override
830    public String toString() {
831        return toString(null);
832    }
833
834    @Override
835    public String toString(Map<String, Object>overrideFields) {
836        try {
837            getProperties();
838        } catch (IOException e) {
839        }
840        return super.toString(overrideFields);
841    }
842
843    @Override
844    public boolean canProcessAsExpired() {
845        return processAsExpired.compareAndSet(false, true);
846    }
847}