001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.io.OutputStream;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.zip.DeflaterOutputStream;
027
028import javax.jms.JMSException;
029
030import org.apache.activemq.ActiveMQConnection;
031import org.apache.activemq.advisory.AdvisorySupport;
032import org.apache.activemq.broker.region.MessageReference;
033import org.apache.activemq.usage.MemoryUsage;
034import org.apache.activemq.util.ByteArrayInputStream;
035import org.apache.activemq.util.ByteArrayOutputStream;
036import org.apache.activemq.util.ByteSequence;
037import org.apache.activemq.util.MarshallingSupport;
038import org.apache.activemq.wireformat.WireFormat;
039import org.fusesource.hawtbuf.UTF8Buffer;
040
041/**
042 * Represents an ActiveMQ message
043 *
044 * @openwire:marshaller
045 *
046 */
047public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
048    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
049
050    /**
051     * The default minimum amount of memory a message is assumed to use
052     */
053    public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
054
055    protected MessageId messageId;
056    protected ActiveMQDestination originalDestination;
057    protected TransactionId originalTransactionId;
058
059    protected ProducerId producerId;
060    protected ActiveMQDestination destination;
061    protected TransactionId transactionId;
062
063    protected long expiration;
064    protected long timestamp;
065    protected long arrival;
066    protected long brokerInTime;
067    protected long brokerOutTime;
068    protected String correlationId;
069    protected ActiveMQDestination replyTo;
070    protected boolean persistent;
071    protected String type;
072    protected byte priority;
073    protected String groupID;
074    protected int groupSequence;
075    protected ConsumerId targetConsumerId;
076    protected boolean compressed;
077    protected String userID;
078
079    protected ByteSequence content;
080    protected ByteSequence marshalledProperties;
081    protected DataStructure dataStructure;
082    protected int redeliveryCounter;
083
084    protected int size;
085    protected Map<String, Object> properties;
086    protected boolean readOnlyProperties;
087    protected boolean readOnlyBody;
088    protected transient boolean recievedByDFBridge;
089    protected boolean droppable;
090    protected boolean jmsXGroupFirstForConsumer;
091
092    private transient short referenceCount;
093    private transient ActiveMQConnection connection;
094    transient MessageDestination regionDestination;
095    transient MemoryUsage memoryUsage;
096
097    private BrokerId[] brokerPath;
098    private BrokerId[] cluster;
099
100    public static interface MessageDestination {
101        int getMinimumMessageSize();
102        MemoryUsage getMemoryUsage();
103    }
104
105    public abstract Message copy();
106    public abstract void clearBody() throws JMSException;
107    public abstract void storeContent();
108    public abstract void storeContentAndClear();
109
110    // useful to reduce the memory footprint of a persisted message
111    public void clearMarshalledState() throws JMSException {
112        properties = null;
113    }
114
115    protected void copy(Message copy) {
116        super.copy(copy);
117        copy.producerId = producerId;
118        copy.transactionId = transactionId;
119        copy.destination = destination;
120        copy.messageId = messageId != null ? messageId.copy() : null;
121        copy.originalDestination = originalDestination;
122        copy.originalTransactionId = originalTransactionId;
123        copy.expiration = expiration;
124        copy.timestamp = timestamp;
125        copy.correlationId = correlationId;
126        copy.replyTo = replyTo;
127        copy.persistent = persistent;
128        copy.redeliveryCounter = redeliveryCounter;
129        copy.type = type;
130        copy.priority = priority;
131        copy.size = size;
132        copy.groupID = groupID;
133        copy.userID = userID;
134        copy.groupSequence = groupSequence;
135
136        if (properties != null) {
137            copy.properties = new HashMap<String, Object>(properties);
138
139            // The new message hasn't expired, so remove this feild.
140            copy.properties.remove(ORIGINAL_EXPIRATION);
141        } else {
142            copy.properties = properties;
143        }
144
145        copy.content = content;
146        copy.marshalledProperties = marshalledProperties;
147        copy.dataStructure = dataStructure;
148        copy.readOnlyProperties = readOnlyProperties;
149        copy.readOnlyBody = readOnlyBody;
150        copy.compressed = compressed;
151        copy.recievedByDFBridge = recievedByDFBridge;
152
153        copy.arrival = arrival;
154        copy.connection = connection;
155        copy.regionDestination = regionDestination;
156        copy.brokerInTime = brokerInTime;
157        copy.brokerOutTime = brokerOutTime;
158        copy.memoryUsage=this.memoryUsage;
159        copy.brokerPath = brokerPath;
160        copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer;
161
162        // lets not copy the following fields
163        // copy.targetConsumerId = targetConsumerId;
164        // copy.referenceCount = referenceCount;
165    }
166
167    public Object getProperty(String name) throws IOException {
168        if (properties == null) {
169            if (marshalledProperties == null) {
170                return null;
171            }
172            properties = unmarsallProperties(marshalledProperties);
173        }
174        Object result = properties.get(name);
175        if (result instanceof UTF8Buffer) {
176            result = result.toString();
177        }
178
179        return result;
180    }
181
182    @SuppressWarnings("unchecked")
183    public Map<String, Object> getProperties() throws IOException {
184        if (properties == null) {
185            if (marshalledProperties == null) {
186                return Collections.EMPTY_MAP;
187            }
188            properties = unmarsallProperties(marshalledProperties);
189        }
190        return Collections.unmodifiableMap(properties);
191    }
192
193    public void clearProperties() {
194        marshalledProperties = null;
195        properties = null;
196    }
197
198    public void setProperty(String name, Object value) throws IOException {
199        lazyCreateProperties();
200        properties.put(name, value);
201    }
202
203    public void removeProperty(String name) throws IOException {
204        lazyCreateProperties();
205        properties.remove(name);
206    }
207
208    protected void lazyCreateProperties() throws IOException {
209        if (properties == null) {
210            if (marshalledProperties == null) {
211                properties = new HashMap<String, Object>();
212            } else {
213                properties = unmarsallProperties(marshalledProperties);
214                marshalledProperties = null;
215            }
216        } else {
217            marshalledProperties = null;
218        }
219    }
220
221    private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
222        return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
223    }
224
225    @Override
226        public void beforeMarshall(WireFormat wireFormat) throws IOException {
227        // Need to marshal the properties.
228        if (marshalledProperties == null && properties != null) {
229            ByteArrayOutputStream baos = new ByteArrayOutputStream();
230            DataOutputStream os = new DataOutputStream(baos);
231            MarshallingSupport.marshalPrimitiveMap(properties, os);
232            os.close();
233            marshalledProperties = baos.toByteSequence();
234        }
235    }
236
237    @Override
238        public void afterMarshall(WireFormat wireFormat) throws IOException {
239    }
240
241    @Override
242        public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
243    }
244
245    @Override
246        public void afterUnmarshall(WireFormat wireFormat) throws IOException {
247    }
248
249    // /////////////////////////////////////////////////////////////////
250    //
251    // Simple Field accessors
252    //
253    // /////////////////////////////////////////////////////////////////
254
255    /**
256     * @openwire:property version=1 cache=true
257     */
258    public ProducerId getProducerId() {
259        return producerId;
260    }
261
262    public void setProducerId(ProducerId producerId) {
263        this.producerId = producerId;
264    }
265
266    /**
267     * @openwire:property version=1 cache=true
268     */
269    public ActiveMQDestination getDestination() {
270        return destination;
271    }
272
273    public void setDestination(ActiveMQDestination destination) {
274        this.destination = destination;
275    }
276
277    /**
278     * @openwire:property version=1 cache=true
279     */
280    public TransactionId getTransactionId() {
281        return transactionId;
282    }
283
284    public void setTransactionId(TransactionId transactionId) {
285        this.transactionId = transactionId;
286    }
287
288    public boolean isInTransaction() {
289        return transactionId != null;
290    }
291
292    /**
293     * @openwire:property version=1 cache=true
294     */
295    public ActiveMQDestination getOriginalDestination() {
296        return originalDestination;
297    }
298
299    public void setOriginalDestination(ActiveMQDestination destination) {
300        this.originalDestination = destination;
301    }
302
303    /**
304     * @openwire:property version=1
305     */
306    @Override
307        public MessageId getMessageId() {
308        return messageId;
309    }
310
311    public void setMessageId(MessageId messageId) {
312        this.messageId = messageId;
313    }
314
315    /**
316     * @openwire:property version=1 cache=true
317     */
318    public TransactionId getOriginalTransactionId() {
319        return originalTransactionId;
320    }
321
322    public void setOriginalTransactionId(TransactionId transactionId) {
323        this.originalTransactionId = transactionId;
324    }
325
326    /**
327     * @openwire:property version=1
328     */
329    @Override
330        public String getGroupID() {
331        return groupID;
332    }
333
334    public void setGroupID(String groupID) {
335        this.groupID = groupID;
336    }
337
338    /**
339     * @openwire:property version=1
340     */
341    @Override
342        public int getGroupSequence() {
343        return groupSequence;
344    }
345
346    public void setGroupSequence(int groupSequence) {
347        this.groupSequence = groupSequence;
348    }
349
350    /**
351     * @openwire:property version=1
352     */
353    public String getCorrelationId() {
354        return correlationId;
355    }
356
357    public void setCorrelationId(String correlationId) {
358        this.correlationId = correlationId;
359    }
360
361    /**
362     * @openwire:property version=1
363     */
364    @Override
365        public boolean isPersistent() {
366        return persistent;
367    }
368
369    public void setPersistent(boolean deliveryMode) {
370        this.persistent = deliveryMode;
371    }
372
373    /**
374     * @openwire:property version=1
375     */
376    @Override
377        public long getExpiration() {
378        return expiration;
379    }
380
381    public void setExpiration(long expiration) {
382        this.expiration = expiration;
383    }
384
385    /**
386     * @openwire:property version=1
387     */
388    public byte getPriority() {
389        return priority;
390    }
391
392    public void setPriority(byte priority) {
393        if (priority < 0) {
394            this.priority = 0;
395        } else if (priority > 9) {
396            this.priority = 9;
397        } else {
398            this.priority = priority;
399        }
400    }
401
402    /**
403     * @openwire:property version=1
404     */
405    public ActiveMQDestination getReplyTo() {
406        return replyTo;
407    }
408
409    public void setReplyTo(ActiveMQDestination replyTo) {
410        this.replyTo = replyTo;
411    }
412
413    /**
414     * @openwire:property version=1
415     */
416    public long getTimestamp() {
417        return timestamp;
418    }
419
420    public void setTimestamp(long timestamp) {
421        this.timestamp = timestamp;
422    }
423
424    /**
425     * @openwire:property version=1
426     */
427    public String getType() {
428        return type;
429    }
430
431    public void setType(String type) {
432        this.type = type;
433    }
434
435    /**
436     * @openwire:property version=1
437     */
438    public ByteSequence getContent() {
439        return content;
440    }
441
442    public void setContent(ByteSequence content) {
443        this.content = content;
444    }
445
446    /**
447     * @openwire:property version=1
448     */
449    public ByteSequence getMarshalledProperties() {
450        return marshalledProperties;
451    }
452
453    public void setMarshalledProperties(ByteSequence marshalledProperties) {
454        this.marshalledProperties = marshalledProperties;
455    }
456
457    /**
458     * @openwire:property version=1
459     */
460    public DataStructure getDataStructure() {
461        return dataStructure;
462    }
463
464    public void setDataStructure(DataStructure data) {
465        this.dataStructure = data;
466    }
467
468    /**
469     * Can be used to route the message to a specific consumer. Should be null
470     * to allow the broker use normal JMS routing semantics. If the target
471     * consumer id is an active consumer on the broker, the message is dropped.
472     * Used by the AdvisoryBroker to replay advisory messages to a specific
473     * consumer.
474     *
475     * @openwire:property version=1 cache=true
476     */
477    @Override
478        public ConsumerId getTargetConsumerId() {
479        return targetConsumerId;
480    }
481
482    public void setTargetConsumerId(ConsumerId targetConsumerId) {
483        this.targetConsumerId = targetConsumerId;
484    }
485
486    @Override
487        public boolean isExpired() {
488        long expireTime = getExpiration();
489        return expireTime > 0 && System.currentTimeMillis() > expireTime;
490    }
491
492    @Override
493        public boolean isAdvisory() {
494        return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
495    }
496
497    /**
498     * @openwire:property version=1
499     */
500    public boolean isCompressed() {
501        return compressed;
502    }
503
504    public void setCompressed(boolean compressed) {
505        this.compressed = compressed;
506    }
507
508    public boolean isRedelivered() {
509        return redeliveryCounter > 0;
510    }
511
512    public void setRedelivered(boolean redelivered) {
513        if (redelivered) {
514            if (!isRedelivered()) {
515                setRedeliveryCounter(1);
516            }
517        } else {
518            if (isRedelivered()) {
519                setRedeliveryCounter(0);
520            }
521        }
522    }
523
524    @Override
525        public void incrementRedeliveryCounter() {
526        redeliveryCounter++;
527    }
528
529    /**
530     * @openwire:property version=1
531     */
532    @Override
533        public int getRedeliveryCounter() {
534        return redeliveryCounter;
535    }
536
537    public void setRedeliveryCounter(int deliveryCounter) {
538        this.redeliveryCounter = deliveryCounter;
539    }
540
541    /**
542     * The route of brokers the command has moved through.
543     *
544     * @openwire:property version=1 cache=true
545     */
546    public BrokerId[] getBrokerPath() {
547        return brokerPath;
548    }
549
550    public void setBrokerPath(BrokerId[] brokerPath) {
551        this.brokerPath = brokerPath;
552    }
553
554    public boolean isReadOnlyProperties() {
555        return readOnlyProperties;
556    }
557
558    public void setReadOnlyProperties(boolean readOnlyProperties) {
559        this.readOnlyProperties = readOnlyProperties;
560    }
561
562    public boolean isReadOnlyBody() {
563        return readOnlyBody;
564    }
565
566    public void setReadOnlyBody(boolean readOnlyBody) {
567        this.readOnlyBody = readOnlyBody;
568    }
569
570    public ActiveMQConnection getConnection() {
571        return this.connection;
572    }
573
574    public void setConnection(ActiveMQConnection connection) {
575        this.connection = connection;
576    }
577
578    /**
579     * Used to schedule the arrival time of a message to a broker. The broker
580     * will not dispatch a message to a consumer until it's arrival time has
581     * elapsed.
582     *
583     * @openwire:property version=1
584     */
585    public long getArrival() {
586        return arrival;
587    }
588
589    public void setArrival(long arrival) {
590        this.arrival = arrival;
591    }
592
593    /**
594     * Only set by the broker and defines the userID of the producer connection
595     * who sent this message. This is an optional field, it needs to be enabled
596     * on the broker to have this field populated.
597     *
598     * @openwire:property version=1
599     */
600    public String getUserID() {
601        return userID;
602    }
603
604    public void setUserID(String jmsxUserID) {
605        this.userID = jmsxUserID;
606    }
607
608    @Override
609        public int getReferenceCount() {
610        return referenceCount;
611    }
612
613    @Override
614        public Message getMessageHardRef() {
615        return this;
616    }
617
618    @Override
619        public Message getMessage() {
620        return this;
621    }
622
623    public void setRegionDestination(MessageDestination destination) {
624        this.regionDestination = destination;
625        if(this.memoryUsage==null) {
626            this.memoryUsage=destination.getMemoryUsage();
627        }
628    }
629
630    @Override
631        public MessageDestination getRegionDestination() {
632        return regionDestination;
633    }
634
635    public MemoryUsage getMemoryUsage() {
636        return this.memoryUsage;
637    }
638
639    public void setMemoryUsage(MemoryUsage usage) {
640        this.memoryUsage=usage;
641    }
642
643    @Override
644    public boolean isMarshallAware() {
645        return true;
646    }
647
648    @Override
649        public int incrementReferenceCount() {
650        int rc;
651        int size;
652        synchronized (this) {
653            rc = ++referenceCount;
654            size = getSize();
655        }
656
657        if (rc == 1 && getMemoryUsage() != null) {
658            getMemoryUsage().increaseUsage(size);
659            //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
660
661        }
662
663        //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
664        return rc;
665    }
666
667    @Override
668        public int decrementReferenceCount() {
669        int rc;
670        int size;
671        synchronized (this) {
672            rc = --referenceCount;
673            size = getSize();
674        }
675
676        if (rc == 0 && getMemoryUsage() != null) {
677            getMemoryUsage().decreaseUsage(size);
678            //Thread.dumpStack();
679            //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
680        }
681
682        //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
683
684        return rc;
685    }
686
687    @Override
688        public int getSize() {
689        int minimumMessageSize = getMinimumMessageSize();
690        if (size < minimumMessageSize || size == 0) {
691            size = minimumMessageSize;
692            if (marshalledProperties != null) {
693                size += marshalledProperties.getLength();
694            }
695            if (content != null) {
696                size += content.getLength();
697            }
698        }
699        return size;
700    }
701
702    protected int getMinimumMessageSize() {
703        int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
704        //let destination override
705        MessageDestination dest = regionDestination;
706        if (dest != null) {
707            result=dest.getMinimumMessageSize();
708        }
709        return result;
710    }
711
712    /**
713     * @openwire:property version=1
714     * @return Returns the recievedByDFBridge.
715     */
716    public boolean isRecievedByDFBridge() {
717        return recievedByDFBridge;
718    }
719
720    /**
721     * @param recievedByDFBridge The recievedByDFBridge to set.
722     */
723    public void setRecievedByDFBridge(boolean recievedByDFBridge) {
724        this.recievedByDFBridge = recievedByDFBridge;
725    }
726
727    public void onMessageRolledBack() {
728        incrementRedeliveryCounter();
729    }
730
731    /**
732     * @openwire:property version=2 cache=true
733     */
734    public boolean isDroppable() {
735        return droppable;
736    }
737
738    public void setDroppable(boolean droppable) {
739        this.droppable = droppable;
740    }
741
742    /**
743     * If a message is stored in multiple nodes on a cluster, all the cluster
744     * members will be listed here. Otherwise, it will be null.
745     *
746     * @openwire:property version=3 cache=true
747     */
748    public BrokerId[] getCluster() {
749        return cluster;
750    }
751
752    public void setCluster(BrokerId[] cluster) {
753        this.cluster = cluster;
754    }
755
756    @Override
757    public boolean isMessage() {
758        return true;
759    }
760
761    /**
762     * @openwire:property version=3
763     */
764    public long getBrokerInTime() {
765        return this.brokerInTime;
766    }
767
768    public void setBrokerInTime(long brokerInTime) {
769        this.brokerInTime = brokerInTime;
770    }
771
772    /**
773     * @openwire:property version=3
774     */
775    public long getBrokerOutTime() {
776        return this.brokerOutTime;
777    }
778
779    public void setBrokerOutTime(long brokerOutTime) {
780        this.brokerOutTime = brokerOutTime;
781    }
782
783    @Override
784        public boolean isDropped() {
785        return false;
786    }
787
788    /**
789     * @openwire:property version=10
790     */
791    public boolean isJMSXGroupFirstForConsumer() {
792        return jmsXGroupFirstForConsumer;
793    }
794
795    public void setJMSXGroupFirstForConsumer(boolean val) {
796        jmsXGroupFirstForConsumer = val;
797    }
798
799    public void compress() throws IOException {
800        if (!isCompressed()) {
801            storeContent();
802            if (!isCompressed() && getContent() != null) {
803                doCompress();
804            }
805        }
806    }
807
808    protected void doCompress() throws IOException {
809        compressed = true;
810        ByteSequence bytes = getContent();
811        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
812        OutputStream os = new DeflaterOutputStream(bytesOut);
813        os.write(bytes.data, bytes.offset, bytes.length);
814        os.close();
815        setContent(bytesOut.toByteSequence());
816    }
817
818    @Override
819    public String toString() {
820        return toString(null);
821    }
822
823    @Override
824    public String toString(Map<String, Object>overrideFields) {
825        try {
826            getProperties();
827        } catch (IOException e) {
828        }
829        return super.toString(overrideFields);
830    }
831}