001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.IOException;
022    import java.io.OutputStream;
023    import java.util.Collections;
024    import java.util.HashMap;
025    import java.util.Map;
026    import java.util.zip.DeflaterOutputStream;
027    
028    import javax.jms.JMSException;
029    
030    import org.apache.activemq.ActiveMQConnection;
031    import org.apache.activemq.advisory.AdvisorySupport;
032    import org.apache.activemq.broker.region.Destination;
033    import org.apache.activemq.broker.region.MessageReference;
034    import org.apache.activemq.broker.region.RegionBroker;
035    import org.apache.activemq.usage.MemoryUsage;
036    import org.apache.activemq.util.ByteArrayInputStream;
037    import org.apache.activemq.util.ByteArrayOutputStream;
038    import org.apache.activemq.util.ByteSequence;
039    import org.apache.activemq.util.MarshallingSupport;
040    import org.apache.activemq.wireformat.WireFormat;
041    
042    /**
043     * Represents an ActiveMQ message
044     *
045     * @openwire:marshaller
046     *
047     */
048    public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
049    
050        /**
051         * The default minimum amount of memory a message is assumed to use
052         */
053        public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
054    
055        protected MessageId messageId;
056        protected ActiveMQDestination originalDestination;
057        protected TransactionId originalTransactionId;
058    
059        protected ProducerId producerId;
060        protected ActiveMQDestination destination;
061        protected TransactionId transactionId;
062    
063        protected long expiration;
064        protected long timestamp;
065        protected long arrival;
066        protected long brokerInTime;
067        protected long brokerOutTime;
068        protected String correlationId;
069        protected ActiveMQDestination replyTo;
070        protected boolean persistent;
071        protected String type;
072        protected byte priority;
073        protected String groupID;
074        protected int groupSequence;
075        protected ConsumerId targetConsumerId;
076        protected boolean compressed;
077        protected String userID;
078    
079        protected ByteSequence content;
080        protected ByteSequence marshalledProperties;
081        protected DataStructure dataStructure;
082        protected int redeliveryCounter;
083    
084        protected int size;
085        protected Map<String, Object> properties;
086        protected boolean readOnlyProperties;
087        protected boolean readOnlyBody;
088        protected transient boolean recievedByDFBridge;
089        protected boolean droppable;
090    
091        private transient short referenceCount;
092        private transient ActiveMQConnection connection;
093        private transient org.apache.activemq.broker.region.Destination regionDestination;
094        private transient MemoryUsage memoryUsage;
095    
096        private BrokerId[] brokerPath;
097        private BrokerId[] cluster;
098    
099        public abstract Message copy();
100        public abstract void clearBody() throws JMSException;
101        public abstract void storeContent();
102    
103        // useful to reduce the memory footprint of a persisted message
104        public void clearMarshalledState() throws JMSException {
105            properties = null;
106        }
107    
108        protected void copy(Message copy) {
109            super.copy(copy);
110            copy.producerId = producerId;
111            copy.transactionId = transactionId;
112            copy.destination = destination;
113            copy.messageId = messageId != null ? messageId.copy() : null;
114            copy.originalDestination = originalDestination;
115            copy.originalTransactionId = originalTransactionId;
116            copy.expiration = expiration;
117            copy.timestamp = timestamp;
118            copy.correlationId = correlationId;
119            copy.replyTo = replyTo;
120            copy.persistent = persistent;
121            copy.redeliveryCounter = redeliveryCounter;
122            copy.type = type;
123            copy.priority = priority;
124            copy.size = size;
125            copy.groupID = groupID;
126            copy.userID = userID;
127            copy.groupSequence = groupSequence;
128    
129            if (properties != null) {
130                copy.properties = new HashMap<String, Object>(properties);
131    
132                // The new message hasn't expired, so remove this feild.
133                copy.properties.remove(RegionBroker.ORIGINAL_EXPIRATION);
134            } else {
135                copy.properties = properties;
136            }
137    
138            copy.content = content;
139            copy.marshalledProperties = marshalledProperties;
140            copy.dataStructure = dataStructure;
141            copy.readOnlyProperties = readOnlyProperties;
142            copy.readOnlyBody = readOnlyBody;
143            copy.compressed = compressed;
144            copy.recievedByDFBridge = recievedByDFBridge;
145    
146            copy.arrival = arrival;
147            copy.connection = connection;
148            copy.regionDestination = regionDestination;
149            copy.brokerInTime = brokerInTime;
150            copy.brokerOutTime = brokerOutTime;
151            copy.memoryUsage=this.memoryUsage;
152            copy.brokerPath = brokerPath;
153    
154            // lets not copy the following fields
155            // copy.targetConsumerId = targetConsumerId;
156            // copy.referenceCount = referenceCount;
157        }
158    
159        public Object getProperty(String name) throws IOException {
160            if (properties == null) {
161                if (marshalledProperties == null) {
162                    return null;
163                }
164                properties = unmarsallProperties(marshalledProperties);
165            }
166            return properties.get(name);
167        }
168    
169        @SuppressWarnings("unchecked")
170        public Map<String, Object> getProperties() throws IOException {
171            if (properties == null) {
172                if (marshalledProperties == null) {
173                    return Collections.EMPTY_MAP;
174                }
175                properties = unmarsallProperties(marshalledProperties);
176            }
177            return Collections.unmodifiableMap(properties);
178        }
179    
180        public void clearProperties() {
181            marshalledProperties = null;
182            properties = null;
183        }
184    
185        public void setProperty(String name, Object value) throws IOException {
186            lazyCreateProperties();
187            properties.put(name, value);
188        }
189    
190        public void removeProperty(String name) throws IOException {
191            lazyCreateProperties();
192            properties.remove(name);
193        }
194    
195        protected void lazyCreateProperties() throws IOException {
196            if (properties == null) {
197                if (marshalledProperties == null) {
198                    properties = new HashMap<String, Object>();
199                } else {
200                    properties = unmarsallProperties(marshalledProperties);
201                    marshalledProperties = null;
202                }
203            }
204        }
205    
206        private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
207            return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
208        }
209    
210        public void beforeMarshall(WireFormat wireFormat) throws IOException {
211            // Need to marshal the properties.
212            if (marshalledProperties == null && properties != null) {
213                ByteArrayOutputStream baos = new ByteArrayOutputStream();
214                DataOutputStream os = new DataOutputStream(baos);
215                MarshallingSupport.marshalPrimitiveMap(properties, os);
216                os.close();
217                marshalledProperties = baos.toByteSequence();
218            }
219        }
220    
221        public void afterMarshall(WireFormat wireFormat) throws IOException {
222        }
223    
224        public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
225        }
226    
227        public void afterUnmarshall(WireFormat wireFormat) throws IOException {
228        }
229    
230        // /////////////////////////////////////////////////////////////////
231        //
232        // Simple Field accessors
233        //
234        // /////////////////////////////////////////////////////////////////
235    
236        /**
237         * @openwire:property version=1 cache=true
238         */
239        public ProducerId getProducerId() {
240            return producerId;
241        }
242    
243        public void setProducerId(ProducerId producerId) {
244            this.producerId = producerId;
245        }
246    
247        /**
248         * @openwire:property version=1 cache=true
249         */
250        public ActiveMQDestination getDestination() {
251            return destination;
252        }
253    
254        public void setDestination(ActiveMQDestination destination) {
255            this.destination = destination;
256        }
257    
258        /**
259         * @openwire:property version=1 cache=true
260         */
261        public TransactionId getTransactionId() {
262            return transactionId;
263        }
264    
265        public void setTransactionId(TransactionId transactionId) {
266            this.transactionId = transactionId;
267        }
268    
269        public boolean isInTransaction() {
270            return transactionId != null;
271        }
272    
273        /**
274         * @openwire:property version=1 cache=true
275         */
276        public ActiveMQDestination getOriginalDestination() {
277            return originalDestination;
278        }
279    
280        public void setOriginalDestination(ActiveMQDestination destination) {
281            this.originalDestination = destination;
282        }
283    
284        /**
285         * @openwire:property version=1
286         */
287        public MessageId getMessageId() {
288            return messageId;
289        }
290    
291        public void setMessageId(MessageId messageId) {
292            this.messageId = messageId;
293        }
294    
295        /**
296         * @openwire:property version=1 cache=true
297         */
298        public TransactionId getOriginalTransactionId() {
299            return originalTransactionId;
300        }
301    
302        public void setOriginalTransactionId(TransactionId transactionId) {
303            this.originalTransactionId = transactionId;
304        }
305    
306        /**
307         * @openwire:property version=1
308         */
309        public String getGroupID() {
310            return groupID;
311        }
312    
313        public void setGroupID(String groupID) {
314            this.groupID = groupID;
315        }
316    
317        /**
318         * @openwire:property version=1
319         */
320        public int getGroupSequence() {
321            return groupSequence;
322        }
323    
324        public void setGroupSequence(int groupSequence) {
325            this.groupSequence = groupSequence;
326        }
327    
328        /**
329         * @openwire:property version=1
330         */
331        public String getCorrelationId() {
332            return correlationId;
333        }
334    
335        public void setCorrelationId(String correlationId) {
336            this.correlationId = correlationId;
337        }
338    
339        /**
340         * @openwire:property version=1
341         */
342        public boolean isPersistent() {
343            return persistent;
344        }
345    
346        public void setPersistent(boolean deliveryMode) {
347            this.persistent = deliveryMode;
348        }
349    
350        /**
351         * @openwire:property version=1
352         */
353        public long getExpiration() {
354            return expiration;
355        }
356    
357        public void setExpiration(long expiration) {
358            this.expiration = expiration;
359        }
360    
361        /**
362         * @openwire:property version=1
363         */
364        public byte getPriority() {
365            return priority;
366        }
367    
368        public void setPriority(byte priority) {
369            if (priority < 0) {
370                this.priority = 0;
371            } else if (priority > 9) {
372                this.priority = 9;
373            } else {
374                this.priority = priority;
375            }
376        }
377    
378        /**
379         * @openwire:property version=1
380         */
381        public ActiveMQDestination getReplyTo() {
382            return replyTo;
383        }
384    
385        public void setReplyTo(ActiveMQDestination replyTo) {
386            this.replyTo = replyTo;
387        }
388    
389        /**
390         * @openwire:property version=1
391         */
392        public long getTimestamp() {
393            return timestamp;
394        }
395    
396        public void setTimestamp(long timestamp) {
397            this.timestamp = timestamp;
398        }
399    
400        /**
401         * @openwire:property version=1
402         */
403        public String getType() {
404            return type;
405        }
406    
407        public void setType(String type) {
408            this.type = type;
409        }
410    
411        /**
412         * @openwire:property version=1
413         */
414        public ByteSequence getContent() {
415            return content;
416        }
417    
418        public void setContent(ByteSequence content) {
419            this.content = content;
420        }
421    
422        /**
423         * @openwire:property version=1
424         */
425        public ByteSequence getMarshalledProperties() {
426            return marshalledProperties;
427        }
428    
429        public void setMarshalledProperties(ByteSequence marshalledProperties) {
430            this.marshalledProperties = marshalledProperties;
431        }
432    
433        /**
434         * @openwire:property version=1
435         */
436        public DataStructure getDataStructure() {
437            return dataStructure;
438        }
439    
440        public void setDataStructure(DataStructure data) {
441            this.dataStructure = data;
442        }
443    
444        /**
445         * Can be used to route the message to a specific consumer. Should be null
446         * to allow the broker use normal JMS routing semantics. If the target
447         * consumer id is an active consumer on the broker, the message is dropped.
448         * Used by the AdvisoryBroker to replay advisory messages to a specific
449         * consumer.
450         *
451         * @openwire:property version=1 cache=true
452         */
453        public ConsumerId getTargetConsumerId() {
454            return targetConsumerId;
455        }
456    
457        public void setTargetConsumerId(ConsumerId targetConsumerId) {
458            this.targetConsumerId = targetConsumerId;
459        }
460    
461        public boolean isExpired() {
462            long expireTime = getExpiration();
463            return expireTime > 0 && System.currentTimeMillis() > expireTime;
464        }
465    
466        public boolean isAdvisory() {
467            return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
468        }
469    
470        /**
471         * @openwire:property version=1
472         */
473        public boolean isCompressed() {
474            return compressed;
475        }
476    
477        public void setCompressed(boolean compressed) {
478            this.compressed = compressed;
479        }
480    
481        public boolean isRedelivered() {
482            return redeliveryCounter > 0;
483        }
484    
485        public void setRedelivered(boolean redelivered) {
486            if (redelivered) {
487                if (!isRedelivered()) {
488                    setRedeliveryCounter(1);
489                }
490            } else {
491                if (isRedelivered()) {
492                    setRedeliveryCounter(0);
493                }
494            }
495        }
496    
497        public void incrementRedeliveryCounter() {
498            redeliveryCounter++;
499        }
500    
501        /**
502         * @openwire:property version=1
503         */
504        public int getRedeliveryCounter() {
505            return redeliveryCounter;
506        }
507    
508        public void setRedeliveryCounter(int deliveryCounter) {
509            this.redeliveryCounter = deliveryCounter;
510        }
511    
512        /**
513         * The route of brokers the command has moved through.
514         *
515         * @openwire:property version=1 cache=true
516         */
517        public BrokerId[] getBrokerPath() {
518            return brokerPath;
519        }
520    
521        public void setBrokerPath(BrokerId[] brokerPath) {
522            this.brokerPath = brokerPath;
523        }
524    
525        public boolean isReadOnlyProperties() {
526            return readOnlyProperties;
527        }
528    
529        public void setReadOnlyProperties(boolean readOnlyProperties) {
530            this.readOnlyProperties = readOnlyProperties;
531        }
532    
533        public boolean isReadOnlyBody() {
534            return readOnlyBody;
535        }
536    
537        public void setReadOnlyBody(boolean readOnlyBody) {
538            this.readOnlyBody = readOnlyBody;
539        }
540    
541        public ActiveMQConnection getConnection() {
542            return this.connection;
543        }
544    
545        public void setConnection(ActiveMQConnection connection) {
546            this.connection = connection;
547        }
548    
549        /**
550         * Used to schedule the arrival time of a message to a broker. The broker
551         * will not dispatch a message to a consumer until it's arrival time has
552         * elapsed.
553         *
554         * @openwire:property version=1
555         */
556        public long getArrival() {
557            return arrival;
558        }
559    
560        public void setArrival(long arrival) {
561            this.arrival = arrival;
562        }
563    
564        /**
565         * Only set by the broker and defines the userID of the producer connection
566         * who sent this message. This is an optional field, it needs to be enabled
567         * on the broker to have this field populated.
568         *
569         * @openwire:property version=1
570         */
571        public String getUserID() {
572            return userID;
573        }
574    
575        public void setUserID(String jmsxUserID) {
576            this.userID = jmsxUserID;
577        }
578    
579        public int getReferenceCount() {
580            return referenceCount;
581        }
582    
583        public Message getMessageHardRef() {
584            return this;
585        }
586    
587        public Message getMessage() {
588            return this;
589        }
590    
591        public org.apache.activemq.broker.region.Destination getRegionDestination() {
592            return regionDestination;
593        }
594    
595        public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
596            this.regionDestination = destination;
597            if(this.memoryUsage==null) {
598                this.memoryUsage=regionDestination.getMemoryUsage();
599            }
600        }
601    
602        public MemoryUsage getMemoryUsage() {
603            return this.memoryUsage;
604        }
605    
606        public void setMemoryUsage(MemoryUsage usage) {
607            this.memoryUsage=usage;
608        }
609    
610        @Override
611        public boolean isMarshallAware() {
612            return true;
613        }
614    
615        public int incrementReferenceCount() {
616            int rc;
617            int size;
618            synchronized (this) {
619                rc = ++referenceCount;
620                size = getSize();
621            }
622    
623            if (rc == 1 && getMemoryUsage() != null) {
624                getMemoryUsage().increaseUsage(size);
625                //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
626    
627            }
628    
629            //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
630            return rc;
631        }
632    
633        public int decrementReferenceCount() {
634            int rc;
635            int size;
636            synchronized (this) {
637                rc = --referenceCount;
638                size = getSize();
639            }
640    
641            if (rc == 0 && getMemoryUsage() != null) {
642                getMemoryUsage().decreaseUsage(size);
643                //Thread.dumpStack();
644                //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
645            }
646    
647            //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
648    
649            return rc;
650        }
651    
652        public int getSize() {
653            int minimumMessageSize = getMinimumMessageSize();
654            if (size < minimumMessageSize || size == 0) {
655                size = minimumMessageSize;
656                if (marshalledProperties != null) {
657                    size += marshalledProperties.getLength();
658                }
659                if (content != null) {
660                    size += content.getLength();
661                }
662            }
663            return size;
664        }
665    
666        protected int getMinimumMessageSize() {
667            int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
668            //let destination override
669            Destination dest = regionDestination;
670            if (dest != null) {
671                result=dest.getMinimumMessageSize();
672            }
673            return result;
674        }
675    
676        /**
677         * @openwire:property version=1
678         * @return Returns the recievedByDFBridge.
679         */
680        public boolean isRecievedByDFBridge() {
681            return recievedByDFBridge;
682        }
683    
684        /**
685         * @param recievedByDFBridge The recievedByDFBridge to set.
686         */
687        public void setRecievedByDFBridge(boolean recievedByDFBridge) {
688            this.recievedByDFBridge = recievedByDFBridge;
689        }
690    
691        public void onMessageRolledBack() {
692            incrementRedeliveryCounter();
693        }
694    
695        /**
696         * @openwire:property version=2 cache=true
697         */
698        public boolean isDroppable() {
699            return droppable;
700        }
701    
702        public void setDroppable(boolean droppable) {
703            this.droppable = droppable;
704        }
705    
706        /**
707         * If a message is stored in multiple nodes on a cluster, all the cluster
708         * members will be listed here. Otherwise, it will be null.
709         *
710         * @openwire:property version=3 cache=true
711         */
712        public BrokerId[] getCluster() {
713            return cluster;
714        }
715    
716        public void setCluster(BrokerId[] cluster) {
717            this.cluster = cluster;
718        }
719    
720        @Override
721        public boolean isMessage() {
722            return true;
723        }
724    
725        /**
726         * @openwire:property version=3
727         */
728        public long getBrokerInTime() {
729            return this.brokerInTime;
730        }
731    
732        public void setBrokerInTime(long brokerInTime) {
733            this.brokerInTime = brokerInTime;
734        }
735    
736        /**
737         * @openwire:property version=3
738         */
739        public long getBrokerOutTime() {
740            return this.brokerOutTime;
741        }
742    
743        public void setBrokerOutTime(long brokerOutTime) {
744            this.brokerOutTime = brokerOutTime;
745        }
746    
747        public boolean isDropped() {
748            return false;
749        }
750    
751        public void compress() throws IOException {
752            if (!isCompressed()) {
753                storeContent();
754                if (!isCompressed() && getContent() != null) {
755                    doCompress();
756                }
757            }
758        }
759    
760        protected void doCompress() throws IOException {
761            compressed = true;
762            ByteSequence bytes = getContent();
763            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
764            OutputStream os = new DeflaterOutputStream(bytesOut);
765            os.write(bytes.data, bytes.offset, bytes.length);
766            os.close();
767            setContent(bytesOut.toByteSequence());
768        }
769    
770        @Override
771        public String toString() {
772            return toString(null);
773        }
774    
775        @Override
776        public String toString(Map<String, Object>overrideFields) {
777            try {
778                getProperties();
779            } catch (IOException e) {
780            }
781            return super.toString(overrideFields);
782        }
783    }