001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.io.OutputStream;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.zip.DeflaterOutputStream;
027
028import javax.jms.JMSException;
029
030import org.apache.activemq.ActiveMQConnection;
031import org.apache.activemq.advisory.AdvisorySupport;
032import org.apache.activemq.broker.region.MessageReference;
033import org.apache.activemq.usage.MemoryUsage;
034import org.apache.activemq.util.ByteArrayInputStream;
035import org.apache.activemq.util.ByteArrayOutputStream;
036import org.apache.activemq.util.ByteSequence;
037import org.apache.activemq.util.MarshallingSupport;
038import org.apache.activemq.wireformat.WireFormat;
039import org.fusesource.hawtbuf.UTF8Buffer;
040
041/**
042 * Represents an ActiveMQ message
043 *
044 * @openwire:marshaller
045 *
046 */
047public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
048    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
049
050    /**
051     * The default minimum amount of memory a message is assumed to use
052     */
053    public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
054
055    protected MessageId messageId;
056    protected ActiveMQDestination originalDestination;
057    protected TransactionId originalTransactionId;
058
059    protected ProducerId producerId;
060    protected ActiveMQDestination destination;
061    protected TransactionId transactionId;
062
063    protected long expiration;
064    protected long timestamp;
065    protected long arrival;
066    protected long brokerInTime;
067    protected long brokerOutTime;
068    protected String correlationId;
069    protected ActiveMQDestination replyTo;
070    protected boolean persistent;
071    protected String type;
072    protected byte priority;
073    protected String groupID;
074    protected int groupSequence;
075    protected ConsumerId targetConsumerId;
076    protected boolean compressed;
077    protected String userID;
078
079    protected ByteSequence content;
080    protected ByteSequence marshalledProperties;
081    protected DataStructure dataStructure;
082    protected int redeliveryCounter;
083
084    protected int size;
085    protected Map<String, Object> properties;
086    protected boolean readOnlyProperties;
087    protected boolean readOnlyBody;
088    protected transient boolean recievedByDFBridge;
089    protected boolean droppable;
090    protected boolean jmsXGroupFirstForConsumer;
091
092    private transient short referenceCount;
093    private transient ActiveMQConnection connection;
094    transient MessageDestination regionDestination;
095    transient MemoryUsage memoryUsage;
096
097    private BrokerId[] brokerPath;
098    private BrokerId[] cluster;
099
100    public static interface MessageDestination {
101        int getMinimumMessageSize();
102        MemoryUsage getMemoryUsage();
103    }
104
105    public abstract Message copy();
106    public abstract void clearBody() throws JMSException;
107    public abstract void storeContent();
108    public abstract void storeContentAndClear();
109
110    // useful to reduce the memory footprint of a persisted message
111    public void clearMarshalledState() throws JMSException {
112        properties = null;
113    }
114
115    protected void copy(Message copy) {
116        super.copy(copy);
117        copy.producerId = producerId;
118        copy.transactionId = transactionId;
119        copy.destination = destination;
120        copy.messageId = messageId != null ? messageId.copy() : null;
121        copy.originalDestination = originalDestination;
122        copy.originalTransactionId = originalTransactionId;
123        copy.expiration = expiration;
124        copy.timestamp = timestamp;
125        copy.correlationId = correlationId;
126        copy.replyTo = replyTo;
127        copy.persistent = persistent;
128        copy.redeliveryCounter = redeliveryCounter;
129        copy.type = type;
130        copy.priority = priority;
131        copy.size = size;
132        copy.groupID = groupID;
133        copy.userID = userID;
134        copy.groupSequence = groupSequence;
135
136        if (properties != null) {
137            copy.properties = new HashMap<String, Object>(properties);
138
139            // The new message hasn't expired, so remove this feild.
140            copy.properties.remove(ORIGINAL_EXPIRATION);
141        } else {
142            copy.properties = properties;
143        }
144
145        copy.content = copyByteSequence(content);
146        copy.marshalledProperties = copyByteSequence(marshalledProperties);
147        copy.dataStructure = dataStructure;
148        copy.readOnlyProperties = readOnlyProperties;
149        copy.readOnlyBody = readOnlyBody;
150        copy.compressed = compressed;
151        copy.recievedByDFBridge = recievedByDFBridge;
152
153        copy.arrival = arrival;
154        copy.connection = connection;
155        copy.regionDestination = regionDestination;
156        copy.brokerInTime = brokerInTime;
157        copy.brokerOutTime = brokerOutTime;
158        copy.memoryUsage=this.memoryUsage;
159        copy.brokerPath = brokerPath;
160        copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer;
161
162        // lets not copy the following fields
163        // copy.targetConsumerId = targetConsumerId;
164        // copy.referenceCount = referenceCount;
165    }
166
167    private ByteSequence copyByteSequence(ByteSequence content) {
168        if (content != null) {
169            return new ByteSequence(content.getData(), content.getOffset(), content.getLength());
170        }
171        return null;
172    }
173
174    public Object getProperty(String name) throws IOException {
175        if (properties == null) {
176            if (marshalledProperties == null) {
177                return null;
178            }
179            properties = unmarsallProperties(marshalledProperties);
180        }
181        Object result = properties.get(name);
182        if (result instanceof UTF8Buffer) {
183            result = result.toString();
184        }
185
186        return result;
187    }
188
189    @SuppressWarnings("unchecked")
190    public Map<String, Object> getProperties() throws IOException {
191        if (properties == null) {
192            if (marshalledProperties == null) {
193                return Collections.EMPTY_MAP;
194            }
195            properties = unmarsallProperties(marshalledProperties);
196        }
197        return Collections.unmodifiableMap(properties);
198    }
199
200    public void clearProperties() {
201        marshalledProperties = null;
202        properties = null;
203    }
204
205    public void setProperty(String name, Object value) throws IOException {
206        lazyCreateProperties();
207        properties.put(name, value);
208    }
209
210    public void removeProperty(String name) throws IOException {
211        lazyCreateProperties();
212        properties.remove(name);
213    }
214
215    protected void lazyCreateProperties() throws IOException {
216        if (properties == null) {
217            if (marshalledProperties == null) {
218                properties = new HashMap<String, Object>();
219            } else {
220                properties = unmarsallProperties(marshalledProperties);
221                marshalledProperties = null;
222            }
223        } else {
224            marshalledProperties = null;
225        }
226    }
227
228    private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
229        return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
230    }
231
232    @Override
233        public void beforeMarshall(WireFormat wireFormat) throws IOException {
234        // Need to marshal the properties.
235        if (marshalledProperties == null && properties != null) {
236            ByteArrayOutputStream baos = new ByteArrayOutputStream();
237            DataOutputStream os = new DataOutputStream(baos);
238            MarshallingSupport.marshalPrimitiveMap(properties, os);
239            os.close();
240            marshalledProperties = baos.toByteSequence();
241        }
242    }
243
244    @Override
245        public void afterMarshall(WireFormat wireFormat) throws IOException {
246    }
247
248    @Override
249        public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
250    }
251
252    @Override
253        public void afterUnmarshall(WireFormat wireFormat) throws IOException {
254    }
255
256    // /////////////////////////////////////////////////////////////////
257    //
258    // Simple Field accessors
259    //
260    // /////////////////////////////////////////////////////////////////
261
262    /**
263     * @openwire:property version=1 cache=true
264     */
265    public ProducerId getProducerId() {
266        return producerId;
267    }
268
269    public void setProducerId(ProducerId producerId) {
270        this.producerId = producerId;
271    }
272
273    /**
274     * @openwire:property version=1 cache=true
275     */
276    public ActiveMQDestination getDestination() {
277        return destination;
278    }
279
280    public void setDestination(ActiveMQDestination destination) {
281        this.destination = destination;
282    }
283
284    /**
285     * @openwire:property version=1 cache=true
286     */
287    public TransactionId getTransactionId() {
288        return transactionId;
289    }
290
291    public void setTransactionId(TransactionId transactionId) {
292        this.transactionId = transactionId;
293    }
294
295    public boolean isInTransaction() {
296        return transactionId != null;
297    }
298
299    /**
300     * @openwire:property version=1 cache=true
301     */
302    public ActiveMQDestination getOriginalDestination() {
303        return originalDestination;
304    }
305
306    public void setOriginalDestination(ActiveMQDestination destination) {
307        this.originalDestination = destination;
308    }
309
310    /**
311     * @openwire:property version=1
312     */
313    @Override
314        public MessageId getMessageId() {
315        return messageId;
316    }
317
318    public void setMessageId(MessageId messageId) {
319        this.messageId = messageId;
320    }
321
322    /**
323     * @openwire:property version=1 cache=true
324     */
325    public TransactionId getOriginalTransactionId() {
326        return originalTransactionId;
327    }
328
329    public void setOriginalTransactionId(TransactionId transactionId) {
330        this.originalTransactionId = transactionId;
331    }
332
333    /**
334     * @openwire:property version=1
335     */
336    @Override
337        public String getGroupID() {
338        return groupID;
339    }
340
341    public void setGroupID(String groupID) {
342        this.groupID = groupID;
343    }
344
345    /**
346     * @openwire:property version=1
347     */
348    @Override
349        public int getGroupSequence() {
350        return groupSequence;
351    }
352
353    public void setGroupSequence(int groupSequence) {
354        this.groupSequence = groupSequence;
355    }
356
357    /**
358     * @openwire:property version=1
359     */
360    public String getCorrelationId() {
361        return correlationId;
362    }
363
364    public void setCorrelationId(String correlationId) {
365        this.correlationId = correlationId;
366    }
367
368    /**
369     * @openwire:property version=1
370     */
371    @Override
372        public boolean isPersistent() {
373        return persistent;
374    }
375
376    public void setPersistent(boolean deliveryMode) {
377        this.persistent = deliveryMode;
378    }
379
380    /**
381     * @openwire:property version=1
382     */
383    @Override
384        public long getExpiration() {
385        return expiration;
386    }
387
388    public void setExpiration(long expiration) {
389        this.expiration = expiration;
390    }
391
392    /**
393     * @openwire:property version=1
394     */
395    public byte getPriority() {
396        return priority;
397    }
398
399    public void setPriority(byte priority) {
400        if (priority < 0) {
401            this.priority = 0;
402        } else if (priority > 9) {
403            this.priority = 9;
404        } else {
405            this.priority = priority;
406        }
407    }
408
409    /**
410     * @openwire:property version=1
411     */
412    public ActiveMQDestination getReplyTo() {
413        return replyTo;
414    }
415
416    public void setReplyTo(ActiveMQDestination replyTo) {
417        this.replyTo = replyTo;
418    }
419
420    /**
421     * @openwire:property version=1
422     */
423    public long getTimestamp() {
424        return timestamp;
425    }
426
427    public void setTimestamp(long timestamp) {
428        this.timestamp = timestamp;
429    }
430
431    /**
432     * @openwire:property version=1
433     */
434    public String getType() {
435        return type;
436    }
437
438    public void setType(String type) {
439        this.type = type;
440    }
441
442    /**
443     * @openwire:property version=1
444     */
445    public ByteSequence getContent() {
446        return content;
447    }
448
449    public void setContent(ByteSequence content) {
450        this.content = content;
451    }
452
453    /**
454     * @openwire:property version=1
455     */
456    public ByteSequence getMarshalledProperties() {
457        return marshalledProperties;
458    }
459
460    public void setMarshalledProperties(ByteSequence marshalledProperties) {
461        this.marshalledProperties = marshalledProperties;
462    }
463
464    /**
465     * @openwire:property version=1
466     */
467    public DataStructure getDataStructure() {
468        return dataStructure;
469    }
470
471    public void setDataStructure(DataStructure data) {
472        this.dataStructure = data;
473    }
474
475    /**
476     * Can be used to route the message to a specific consumer. Should be null
477     * to allow the broker use normal JMS routing semantics. If the target
478     * consumer id is an active consumer on the broker, the message is dropped.
479     * Used by the AdvisoryBroker to replay advisory messages to a specific
480     * consumer.
481     *
482     * @openwire:property version=1 cache=true
483     */
484    @Override
485        public ConsumerId getTargetConsumerId() {
486        return targetConsumerId;
487    }
488
489    public void setTargetConsumerId(ConsumerId targetConsumerId) {
490        this.targetConsumerId = targetConsumerId;
491    }
492
493    @Override
494        public boolean isExpired() {
495        long expireTime = getExpiration();
496        return expireTime > 0 && System.currentTimeMillis() > expireTime;
497    }
498
499    @Override
500        public boolean isAdvisory() {
501        return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
502    }
503
504    /**
505     * @openwire:property version=1
506     */
507    public boolean isCompressed() {
508        return compressed;
509    }
510
511    public void setCompressed(boolean compressed) {
512        this.compressed = compressed;
513    }
514
515    public boolean isRedelivered() {
516        return redeliveryCounter > 0;
517    }
518
519    public void setRedelivered(boolean redelivered) {
520        if (redelivered) {
521            if (!isRedelivered()) {
522                setRedeliveryCounter(1);
523            }
524        } else {
525            if (isRedelivered()) {
526                setRedeliveryCounter(0);
527            }
528        }
529    }
530
531    @Override
532        public void incrementRedeliveryCounter() {
533        redeliveryCounter++;
534    }
535
536    /**
537     * @openwire:property version=1
538     */
539    @Override
540        public int getRedeliveryCounter() {
541        return redeliveryCounter;
542    }
543
544    public void setRedeliveryCounter(int deliveryCounter) {
545        this.redeliveryCounter = deliveryCounter;
546    }
547
548    /**
549     * The route of brokers the command has moved through.
550     *
551     * @openwire:property version=1 cache=true
552     */
553    public BrokerId[] getBrokerPath() {
554        return brokerPath;
555    }
556
557    public void setBrokerPath(BrokerId[] brokerPath) {
558        this.brokerPath = brokerPath;
559    }
560
561    public boolean isReadOnlyProperties() {
562        return readOnlyProperties;
563    }
564
565    public void setReadOnlyProperties(boolean readOnlyProperties) {
566        this.readOnlyProperties = readOnlyProperties;
567    }
568
569    public boolean isReadOnlyBody() {
570        return readOnlyBody;
571    }
572
573    public void setReadOnlyBody(boolean readOnlyBody) {
574        this.readOnlyBody = readOnlyBody;
575    }
576
577    public ActiveMQConnection getConnection() {
578        return this.connection;
579    }
580
581    public void setConnection(ActiveMQConnection connection) {
582        this.connection = connection;
583    }
584
585    /**
586     * Used to schedule the arrival time of a message to a broker. The broker
587     * will not dispatch a message to a consumer until it's arrival time has
588     * elapsed.
589     *
590     * @openwire:property version=1
591     */
592    public long getArrival() {
593        return arrival;
594    }
595
596    public void setArrival(long arrival) {
597        this.arrival = arrival;
598    }
599
600    /**
601     * Only set by the broker and defines the userID of the producer connection
602     * who sent this message. This is an optional field, it needs to be enabled
603     * on the broker to have this field populated.
604     *
605     * @openwire:property version=1
606     */
607    public String getUserID() {
608        return userID;
609    }
610
611    public void setUserID(String jmsxUserID) {
612        this.userID = jmsxUserID;
613    }
614
615    @Override
616        public int getReferenceCount() {
617        return referenceCount;
618    }
619
620    @Override
621        public Message getMessageHardRef() {
622        return this;
623    }
624
625    @Override
626        public Message getMessage() {
627        return this;
628    }
629
630    public void setRegionDestination(MessageDestination destination) {
631        this.regionDestination = destination;
632        if(this.memoryUsage==null) {
633            this.memoryUsage=destination.getMemoryUsage();
634        }
635    }
636
637    @Override
638        public MessageDestination getRegionDestination() {
639        return regionDestination;
640    }
641
642    public MemoryUsage getMemoryUsage() {
643        return this.memoryUsage;
644    }
645
646    public void setMemoryUsage(MemoryUsage usage) {
647        this.memoryUsage=usage;
648    }
649
650    @Override
651    public boolean isMarshallAware() {
652        return true;
653    }
654
655    @Override
656        public int incrementReferenceCount() {
657        int rc;
658        int size;
659        synchronized (this) {
660            rc = ++referenceCount;
661            size = getSize();
662        }
663
664        if (rc == 1 && getMemoryUsage() != null) {
665            getMemoryUsage().increaseUsage(size);
666            //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
667
668        }
669
670        //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
671        return rc;
672    }
673
674    @Override
675        public int decrementReferenceCount() {
676        int rc;
677        int size;
678        synchronized (this) {
679            rc = --referenceCount;
680            size = getSize();
681        }
682
683        if (rc == 0 && getMemoryUsage() != null) {
684            getMemoryUsage().decreaseUsage(size);
685            //Thread.dumpStack();
686            //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
687        }
688
689        //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
690
691        return rc;
692    }
693
694    @Override
695        public int getSize() {
696        int minimumMessageSize = getMinimumMessageSize();
697        if (size < minimumMessageSize || size == 0) {
698            size = minimumMessageSize;
699            if (marshalledProperties != null) {
700                size += marshalledProperties.getLength();
701            }
702            if (content != null) {
703                size += content.getLength();
704            }
705        }
706        return size;
707    }
708
709    protected int getMinimumMessageSize() {
710        int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
711        //let destination override
712        MessageDestination dest = regionDestination;
713        if (dest != null) {
714            result=dest.getMinimumMessageSize();
715        }
716        return result;
717    }
718
719    /**
720     * @openwire:property version=1
721     * @return Returns the recievedByDFBridge.
722     */
723    public boolean isRecievedByDFBridge() {
724        return recievedByDFBridge;
725    }
726
727    /**
728     * @param recievedByDFBridge The recievedByDFBridge to set.
729     */
730    public void setRecievedByDFBridge(boolean recievedByDFBridge) {
731        this.recievedByDFBridge = recievedByDFBridge;
732    }
733
734    public void onMessageRolledBack() {
735        incrementRedeliveryCounter();
736    }
737
738    /**
739     * @openwire:property version=2 cache=true
740     */
741    public boolean isDroppable() {
742        return droppable;
743    }
744
745    public void setDroppable(boolean droppable) {
746        this.droppable = droppable;
747    }
748
749    /**
750     * If a message is stored in multiple nodes on a cluster, all the cluster
751     * members will be listed here. Otherwise, it will be null.
752     *
753     * @openwire:property version=3 cache=true
754     */
755    public BrokerId[] getCluster() {
756        return cluster;
757    }
758
759    public void setCluster(BrokerId[] cluster) {
760        this.cluster = cluster;
761    }
762
763    @Override
764    public boolean isMessage() {
765        return true;
766    }
767
768    /**
769     * @openwire:property version=3
770     */
771    public long getBrokerInTime() {
772        return this.brokerInTime;
773    }
774
775    public void setBrokerInTime(long brokerInTime) {
776        this.brokerInTime = brokerInTime;
777    }
778
779    /**
780     * @openwire:property version=3
781     */
782    public long getBrokerOutTime() {
783        return this.brokerOutTime;
784    }
785
786    public void setBrokerOutTime(long brokerOutTime) {
787        this.brokerOutTime = brokerOutTime;
788    }
789
790    @Override
791        public boolean isDropped() {
792        return false;
793    }
794
795    /**
796     * @openwire:property version=10
797     */
798    public boolean isJMSXGroupFirstForConsumer() {
799        return jmsXGroupFirstForConsumer;
800    }
801
802    public void setJMSXGroupFirstForConsumer(boolean val) {
803        jmsXGroupFirstForConsumer = val;
804    }
805
806    public void compress() throws IOException {
807        if (!isCompressed()) {
808            storeContent();
809            if (!isCompressed() && getContent() != null) {
810                doCompress();
811            }
812        }
813    }
814
815    protected void doCompress() throws IOException {
816        compressed = true;
817        ByteSequence bytes = getContent();
818        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
819        OutputStream os = new DeflaterOutputStream(bytesOut);
820        os.write(bytes.data, bytes.offset, bytes.length);
821        os.close();
822        setContent(bytesOut.toByteSequence());
823    }
824
825    @Override
826    public String toString() {
827        return toString(null);
828    }
829
830    @Override
831    public String toString(Map<String, Object>overrideFields) {
832        try {
833            getProperties();
834        } catch (IOException e) {
835        }
836        return super.toString(overrideFields);
837    }
838}