001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.IOException;
022    import java.io.OutputStream;
023    import java.util.Collections;
024    import java.util.HashMap;
025    import java.util.Map;
026    import java.util.zip.DeflaterOutputStream;
027    
028    import javax.jms.JMSException;
029    
030    import org.apache.activemq.ActiveMQConnection;
031    import org.apache.activemq.advisory.AdvisorySupport;
032    import org.apache.activemq.broker.region.MessageReference;
033    import org.apache.activemq.usage.MemoryUsage;
034    import org.apache.activemq.util.ByteArrayInputStream;
035    import org.apache.activemq.util.ByteArrayOutputStream;
036    import org.apache.activemq.util.ByteSequence;
037    import org.apache.activemq.util.MarshallingSupport;
038    import org.apache.activemq.wireformat.WireFormat;
039    import org.fusesource.hawtbuf.UTF8Buffer;
040    
041    /**
042     * Represents an ActiveMQ message
043     *
044     * @openwire:marshaller
045     *
046     */
047    public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
048        public static final String ORIGINAL_EXPIRATION = "originalExpiration";
049    
050        /**
051         * The default minimum amount of memory a message is assumed to use
052         */
053        public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
054    
055        protected MessageId messageId;
056        protected ActiveMQDestination originalDestination;
057        protected TransactionId originalTransactionId;
058    
059        protected ProducerId producerId;
060        protected ActiveMQDestination destination;
061        protected TransactionId transactionId;
062    
063        protected long expiration;
064        protected long timestamp;
065        protected long arrival;
066        protected long brokerInTime;
067        protected long brokerOutTime;
068        protected String correlationId;
069        protected ActiveMQDestination replyTo;
070        protected boolean persistent;
071        protected String type;
072        protected byte priority;
073        protected String groupID;
074        protected int groupSequence;
075        protected ConsumerId targetConsumerId;
076        protected boolean compressed;
077        protected String userID;
078    
079        protected ByteSequence content;
080        protected ByteSequence marshalledProperties;
081        protected DataStructure dataStructure;
082        protected int redeliveryCounter;
083    
084        protected int size;
085        protected Map<String, Object> properties;
086        protected boolean readOnlyProperties;
087        protected boolean readOnlyBody;
088        protected transient boolean recievedByDFBridge;
089        protected boolean droppable;
090    
091        private transient short referenceCount;
092        private transient ActiveMQConnection connection;
093        transient MessageDestination regionDestination;
094        transient MemoryUsage memoryUsage;
095    
096        private BrokerId[] brokerPath;
097        private BrokerId[] cluster;
098    
099        public static interface MessageDestination {
100            int getMinimumMessageSize();
101            MemoryUsage getMemoryUsage();
102        }
103    
104        public abstract Message copy();
105        public abstract void clearBody() throws JMSException;
106        public abstract void storeContent();
107    
108        // useful to reduce the memory footprint of a persisted message
109        public void clearMarshalledState() throws JMSException {
110            properties = null;
111        }
112    
113        protected void copy(Message copy) {
114            super.copy(copy);
115            copy.producerId = producerId;
116            copy.transactionId = transactionId;
117            copy.destination = destination;
118            copy.messageId = messageId != null ? messageId.copy() : null;
119            copy.originalDestination = originalDestination;
120            copy.originalTransactionId = originalTransactionId;
121            copy.expiration = expiration;
122            copy.timestamp = timestamp;
123            copy.correlationId = correlationId;
124            copy.replyTo = replyTo;
125            copy.persistent = persistent;
126            copy.redeliveryCounter = redeliveryCounter;
127            copy.type = type;
128            copy.priority = priority;
129            copy.size = size;
130            copy.groupID = groupID;
131            copy.userID = userID;
132            copy.groupSequence = groupSequence;
133    
134            if (properties != null) {
135                copy.properties = new HashMap<String, Object>(properties);
136    
137                // The new message hasn't expired, so remove this feild.
138                copy.properties.remove(ORIGINAL_EXPIRATION);
139            } else {
140                copy.properties = properties;
141            }
142    
143            copy.content = content;
144            copy.marshalledProperties = marshalledProperties;
145            copy.dataStructure = dataStructure;
146            copy.readOnlyProperties = readOnlyProperties;
147            copy.readOnlyBody = readOnlyBody;
148            copy.compressed = compressed;
149            copy.recievedByDFBridge = recievedByDFBridge;
150    
151            copy.arrival = arrival;
152            copy.connection = connection;
153            copy.regionDestination = regionDestination;
154            copy.brokerInTime = brokerInTime;
155            copy.brokerOutTime = brokerOutTime;
156            copy.memoryUsage=this.memoryUsage;
157            copy.brokerPath = brokerPath;
158    
159            // lets not copy the following fields
160            // copy.targetConsumerId = targetConsumerId;
161            // copy.referenceCount = referenceCount;
162        }
163    
164        public Object getProperty(String name) throws IOException {
165            if (properties == null) {
166                if (marshalledProperties == null) {
167                    return null;
168                }
169                properties = unmarsallProperties(marshalledProperties);
170            }
171            Object result = properties.get(name);
172            if (result instanceof UTF8Buffer) {
173                result = result.toString();
174            }
175    
176            return result;
177        }
178    
179        @SuppressWarnings("unchecked")
180        public Map<String, Object> getProperties() throws IOException {
181            if (properties == null) {
182                if (marshalledProperties == null) {
183                    return Collections.EMPTY_MAP;
184                }
185                properties = unmarsallProperties(marshalledProperties);
186            }
187            return Collections.unmodifiableMap(properties);
188        }
189    
190        public void clearProperties() {
191            marshalledProperties = null;
192            properties = null;
193        }
194    
195        public void setProperty(String name, Object value) throws IOException {
196            lazyCreateProperties();
197            properties.put(name, value);
198        }
199    
200        public void removeProperty(String name) throws IOException {
201            lazyCreateProperties();
202            properties.remove(name);
203        }
204    
205        protected void lazyCreateProperties() throws IOException {
206            if (properties == null) {
207                if (marshalledProperties == null) {
208                    properties = new HashMap<String, Object>();
209                } else {
210                    properties = unmarsallProperties(marshalledProperties);
211                    marshalledProperties = null;
212                }
213            }
214        }
215    
216        private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
217            return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
218        }
219    
220        @Override
221            public void beforeMarshall(WireFormat wireFormat) throws IOException {
222            // Need to marshal the properties.
223            if (marshalledProperties == null && properties != null) {
224                ByteArrayOutputStream baos = new ByteArrayOutputStream();
225                DataOutputStream os = new DataOutputStream(baos);
226                MarshallingSupport.marshalPrimitiveMap(properties, os);
227                os.close();
228                marshalledProperties = baos.toByteSequence();
229            }
230        }
231    
232        @Override
233            public void afterMarshall(WireFormat wireFormat) throws IOException {
234        }
235    
236        @Override
237            public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
238        }
239    
240        @Override
241            public void afterUnmarshall(WireFormat wireFormat) throws IOException {
242        }
243    
244        // /////////////////////////////////////////////////////////////////
245        //
246        // Simple Field accessors
247        //
248        // /////////////////////////////////////////////////////////////////
249    
250        /**
251         * @openwire:property version=1 cache=true
252         */
253        public ProducerId getProducerId() {
254            return producerId;
255        }
256    
257        public void setProducerId(ProducerId producerId) {
258            this.producerId = producerId;
259        }
260    
261        /**
262         * @openwire:property version=1 cache=true
263         */
264        public ActiveMQDestination getDestination() {
265            return destination;
266        }
267    
268        public void setDestination(ActiveMQDestination destination) {
269            this.destination = destination;
270        }
271    
272        /**
273         * @openwire:property version=1 cache=true
274         */
275        public TransactionId getTransactionId() {
276            return transactionId;
277        }
278    
279        public void setTransactionId(TransactionId transactionId) {
280            this.transactionId = transactionId;
281        }
282    
283        public boolean isInTransaction() {
284            return transactionId != null;
285        }
286    
287        /**
288         * @openwire:property version=1 cache=true
289         */
290        public ActiveMQDestination getOriginalDestination() {
291            return originalDestination;
292        }
293    
294        public void setOriginalDestination(ActiveMQDestination destination) {
295            this.originalDestination = destination;
296        }
297    
298        /**
299         * @openwire:property version=1
300         */
301        @Override
302            public MessageId getMessageId() {
303            return messageId;
304        }
305    
306        public void setMessageId(MessageId messageId) {
307            this.messageId = messageId;
308        }
309    
310        /**
311         * @openwire:property version=1 cache=true
312         */
313        public TransactionId getOriginalTransactionId() {
314            return originalTransactionId;
315        }
316    
317        public void setOriginalTransactionId(TransactionId transactionId) {
318            this.originalTransactionId = transactionId;
319        }
320    
321        /**
322         * @openwire:property version=1
323         */
324        @Override
325            public String getGroupID() {
326            return groupID;
327        }
328    
329        public void setGroupID(String groupID) {
330            this.groupID = groupID;
331        }
332    
333        /**
334         * @openwire:property version=1
335         */
336        @Override
337            public int getGroupSequence() {
338            return groupSequence;
339        }
340    
341        public void setGroupSequence(int groupSequence) {
342            this.groupSequence = groupSequence;
343        }
344    
345        /**
346         * @openwire:property version=1
347         */
348        public String getCorrelationId() {
349            return correlationId;
350        }
351    
352        public void setCorrelationId(String correlationId) {
353            this.correlationId = correlationId;
354        }
355    
356        /**
357         * @openwire:property version=1
358         */
359        @Override
360            public boolean isPersistent() {
361            return persistent;
362        }
363    
364        public void setPersistent(boolean deliveryMode) {
365            this.persistent = deliveryMode;
366        }
367    
368        /**
369         * @openwire:property version=1
370         */
371        @Override
372            public long getExpiration() {
373            return expiration;
374        }
375    
376        public void setExpiration(long expiration) {
377            this.expiration = expiration;
378        }
379    
380        /**
381         * @openwire:property version=1
382         */
383        public byte getPriority() {
384            return priority;
385        }
386    
387        public void setPriority(byte priority) {
388            if (priority < 0) {
389                this.priority = 0;
390            } else if (priority > 9) {
391                this.priority = 9;
392            } else {
393                this.priority = priority;
394            }
395        }
396    
397        /**
398         * @openwire:property version=1
399         */
400        public ActiveMQDestination getReplyTo() {
401            return replyTo;
402        }
403    
404        public void setReplyTo(ActiveMQDestination replyTo) {
405            this.replyTo = replyTo;
406        }
407    
408        /**
409         * @openwire:property version=1
410         */
411        public long getTimestamp() {
412            return timestamp;
413        }
414    
415        public void setTimestamp(long timestamp) {
416            this.timestamp = timestamp;
417        }
418    
419        /**
420         * @openwire:property version=1
421         */
422        public String getType() {
423            return type;
424        }
425    
426        public void setType(String type) {
427            this.type = type;
428        }
429    
430        /**
431         * @openwire:property version=1
432         */
433        public ByteSequence getContent() {
434            return content;
435        }
436    
437        public void setContent(ByteSequence content) {
438            this.content = content;
439        }
440    
441        /**
442         * @openwire:property version=1
443         */
444        public ByteSequence getMarshalledProperties() {
445            return marshalledProperties;
446        }
447    
448        public void setMarshalledProperties(ByteSequence marshalledProperties) {
449            this.marshalledProperties = marshalledProperties;
450        }
451    
452        /**
453         * @openwire:property version=1
454         */
455        public DataStructure getDataStructure() {
456            return dataStructure;
457        }
458    
459        public void setDataStructure(DataStructure data) {
460            this.dataStructure = data;
461        }
462    
463        /**
464         * Can be used to route the message to a specific consumer. Should be null
465         * to allow the broker use normal JMS routing semantics. If the target
466         * consumer id is an active consumer on the broker, the message is dropped.
467         * Used by the AdvisoryBroker to replay advisory messages to a specific
468         * consumer.
469         *
470         * @openwire:property version=1 cache=true
471         */
472        @Override
473            public ConsumerId getTargetConsumerId() {
474            return targetConsumerId;
475        }
476    
477        public void setTargetConsumerId(ConsumerId targetConsumerId) {
478            this.targetConsumerId = targetConsumerId;
479        }
480    
481        @Override
482            public boolean isExpired() {
483            long expireTime = getExpiration();
484            return expireTime > 0 && System.currentTimeMillis() > expireTime;
485        }
486    
487        @Override
488            public boolean isAdvisory() {
489            return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
490        }
491    
492        /**
493         * @openwire:property version=1
494         */
495        public boolean isCompressed() {
496            return compressed;
497        }
498    
499        public void setCompressed(boolean compressed) {
500            this.compressed = compressed;
501        }
502    
503        public boolean isRedelivered() {
504            return redeliveryCounter > 0;
505        }
506    
507        public void setRedelivered(boolean redelivered) {
508            if (redelivered) {
509                if (!isRedelivered()) {
510                    setRedeliveryCounter(1);
511                }
512            } else {
513                if (isRedelivered()) {
514                    setRedeliveryCounter(0);
515                }
516            }
517        }
518    
519        @Override
520            public void incrementRedeliveryCounter() {
521            redeliveryCounter++;
522        }
523    
524        /**
525         * @openwire:property version=1
526         */
527        @Override
528            public int getRedeliveryCounter() {
529            return redeliveryCounter;
530        }
531    
532        public void setRedeliveryCounter(int deliveryCounter) {
533            this.redeliveryCounter = deliveryCounter;
534        }
535    
536        /**
537         * The route of brokers the command has moved through.
538         *
539         * @openwire:property version=1 cache=true
540         */
541        public BrokerId[] getBrokerPath() {
542            return brokerPath;
543        }
544    
545        public void setBrokerPath(BrokerId[] brokerPath) {
546            this.brokerPath = brokerPath;
547        }
548    
549        public boolean isReadOnlyProperties() {
550            return readOnlyProperties;
551        }
552    
553        public void setReadOnlyProperties(boolean readOnlyProperties) {
554            this.readOnlyProperties = readOnlyProperties;
555        }
556    
557        public boolean isReadOnlyBody() {
558            return readOnlyBody;
559        }
560    
561        public void setReadOnlyBody(boolean readOnlyBody) {
562            this.readOnlyBody = readOnlyBody;
563        }
564    
565        public ActiveMQConnection getConnection() {
566            return this.connection;
567        }
568    
569        public void setConnection(ActiveMQConnection connection) {
570            this.connection = connection;
571        }
572    
573        /**
574         * Used to schedule the arrival time of a message to a broker. The broker
575         * will not dispatch a message to a consumer until it's arrival time has
576         * elapsed.
577         *
578         * @openwire:property version=1
579         */
580        public long getArrival() {
581            return arrival;
582        }
583    
584        public void setArrival(long arrival) {
585            this.arrival = arrival;
586        }
587    
588        /**
589         * Only set by the broker and defines the userID of the producer connection
590         * who sent this message. This is an optional field, it needs to be enabled
591         * on the broker to have this field populated.
592         *
593         * @openwire:property version=1
594         */
595        public String getUserID() {
596            return userID;
597        }
598    
599        public void setUserID(String jmsxUserID) {
600            this.userID = jmsxUserID;
601        }
602    
603        @Override
604            public int getReferenceCount() {
605            return referenceCount;
606        }
607    
608        @Override
609            public Message getMessageHardRef() {
610            return this;
611        }
612    
613        @Override
614            public Message getMessage() {
615            return this;
616        }
617    
618        public void setRegionDestination(MessageDestination destination) {
619            this.regionDestination = destination;
620            if(this.memoryUsage==null) {
621                this.memoryUsage=destination.getMemoryUsage();
622            }
623        }
624    
625        @Override
626            public MessageDestination getRegionDestination() {
627            return regionDestination;
628        }
629    
630        public MemoryUsage getMemoryUsage() {
631            return this.memoryUsage;
632        }
633    
634        public void setMemoryUsage(MemoryUsage usage) {
635            this.memoryUsage=usage;
636        }
637    
638        @Override
639        public boolean isMarshallAware() {
640            return true;
641        }
642    
643        @Override
644            public int incrementReferenceCount() {
645            int rc;
646            int size;
647            synchronized (this) {
648                rc = ++referenceCount;
649                size = getSize();
650            }
651    
652            if (rc == 1 && getMemoryUsage() != null) {
653                getMemoryUsage().increaseUsage(size);
654                //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
655    
656            }
657    
658            //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
659            return rc;
660        }
661    
662        @Override
663            public int decrementReferenceCount() {
664            int rc;
665            int size;
666            synchronized (this) {
667                rc = --referenceCount;
668                size = getSize();
669            }
670    
671            if (rc == 0 && getMemoryUsage() != null) {
672                getMemoryUsage().decreaseUsage(size);
673                //Thread.dumpStack();
674                //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
675            }
676    
677            //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
678    
679            return rc;
680        }
681    
682        @Override
683            public int getSize() {
684            int minimumMessageSize = getMinimumMessageSize();
685            if (size < minimumMessageSize || size == 0) {
686                size = minimumMessageSize;
687                if (marshalledProperties != null) {
688                    size += marshalledProperties.getLength();
689                }
690                if (content != null) {
691                    size += content.getLength();
692                }
693            }
694            return size;
695        }
696    
697        protected int getMinimumMessageSize() {
698            int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
699            //let destination override
700            MessageDestination dest = regionDestination;
701            if (dest != null) {
702                result=dest.getMinimumMessageSize();
703            }
704            return result;
705        }
706    
707        /**
708         * @openwire:property version=1
709         * @return Returns the recievedByDFBridge.
710         */
711        public boolean isRecievedByDFBridge() {
712            return recievedByDFBridge;
713        }
714    
715        /**
716         * @param recievedByDFBridge The recievedByDFBridge to set.
717         */
718        public void setRecievedByDFBridge(boolean recievedByDFBridge) {
719            this.recievedByDFBridge = recievedByDFBridge;
720        }
721    
722        public void onMessageRolledBack() {
723            incrementRedeliveryCounter();
724        }
725    
726        /**
727         * @openwire:property version=2 cache=true
728         */
729        public boolean isDroppable() {
730            return droppable;
731        }
732    
733        public void setDroppable(boolean droppable) {
734            this.droppable = droppable;
735        }
736    
737        /**
738         * If a message is stored in multiple nodes on a cluster, all the cluster
739         * members will be listed here. Otherwise, it will be null.
740         *
741         * @openwire:property version=3 cache=true
742         */
743        public BrokerId[] getCluster() {
744            return cluster;
745        }
746    
747        public void setCluster(BrokerId[] cluster) {
748            this.cluster = cluster;
749        }
750    
751        @Override
752        public boolean isMessage() {
753            return true;
754        }
755    
756        /**
757         * @openwire:property version=3
758         */
759        public long getBrokerInTime() {
760            return this.brokerInTime;
761        }
762    
763        public void setBrokerInTime(long brokerInTime) {
764            this.brokerInTime = brokerInTime;
765        }
766    
767        /**
768         * @openwire:property version=3
769         */
770        public long getBrokerOutTime() {
771            return this.brokerOutTime;
772        }
773    
774        public void setBrokerOutTime(long brokerOutTime) {
775            this.brokerOutTime = brokerOutTime;
776        }
777    
778        @Override
779            public boolean isDropped() {
780            return false;
781        }
782    
783        public void compress() throws IOException {
784            if (!isCompressed()) {
785                storeContent();
786                if (!isCompressed() && getContent() != null) {
787                    doCompress();
788                }
789            }
790        }
791    
792        protected void doCompress() throws IOException {
793            compressed = true;
794            ByteSequence bytes = getContent();
795            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
796            OutputStream os = new DeflaterOutputStream(bytesOut);
797            os.write(bytes.data, bytes.offset, bytes.length);
798            os.close();
799            setContent(bytesOut.toByteSequence());
800        }
801    
802        @Override
803        public String toString() {
804            return toString(null);
805        }
806    
807        @Override
808        public String toString(Map<String, Object>overrideFields) {
809            try {
810                getProperties();
811            } catch (IOException e) {
812            }
813            return super.toString(overrideFields);
814        }
815    }