001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import java.io.DataInputStream;
020    import java.io.DataOutputStream;
021    import java.io.IOException;
022    import java.io.OutputStream;
023    import java.util.Collections;
024    import java.util.HashMap;
025    import java.util.Map;
026    import java.util.zip.DeflaterOutputStream;
027    
028    import javax.jms.JMSException;
029    
030    import org.apache.activemq.ActiveMQConnection;
031    import org.apache.activemq.advisory.AdvisorySupport;
032    import org.apache.activemq.broker.region.MessageReference;
033    import org.apache.activemq.usage.MemoryUsage;
034    import org.apache.activemq.util.ByteArrayInputStream;
035    import org.apache.activemq.util.ByteArrayOutputStream;
036    import org.apache.activemq.util.ByteSequence;
037    import org.apache.activemq.util.MarshallingSupport;
038    import org.apache.activemq.wireformat.WireFormat;
039    import org.fusesource.hawtbuf.UTF8Buffer;
040    
041    /**
042     * Represents an ActiveMQ message
043     *
044     * @openwire:marshaller
045     *
046     */
047    public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
048        public static final String ORIGINAL_EXPIRATION = "originalExpiration";
049    
050        /**
051         * The default minimum amount of memory a message is assumed to use
052         */
053        public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
054    
055        protected MessageId messageId;
056        protected ActiveMQDestination originalDestination;
057        protected TransactionId originalTransactionId;
058    
059        protected ProducerId producerId;
060        protected ActiveMQDestination destination;
061        protected TransactionId transactionId;
062    
063        protected long expiration;
064        protected long timestamp;
065        protected long arrival;
066        protected long brokerInTime;
067        protected long brokerOutTime;
068        protected String correlationId;
069        protected ActiveMQDestination replyTo;
070        protected boolean persistent;
071        protected String type;
072        protected byte priority;
073        protected String groupID;
074        protected int groupSequence;
075        protected ConsumerId targetConsumerId;
076        protected boolean compressed;
077        protected String userID;
078    
079        protected ByteSequence content;
080        protected ByteSequence marshalledProperties;
081        protected DataStructure dataStructure;
082        protected int redeliveryCounter;
083    
084        protected int size;
085        protected Map<String, Object> properties;
086        protected boolean readOnlyProperties;
087        protected boolean readOnlyBody;
088        protected transient boolean recievedByDFBridge;
089        protected boolean droppable;
090        protected boolean jmsXGroupFirstForConsumer;
091    
092        private transient short referenceCount;
093        private transient ActiveMQConnection connection;
094        transient MessageDestination regionDestination;
095        transient MemoryUsage memoryUsage;
096    
097        private BrokerId[] brokerPath;
098        private BrokerId[] cluster;
099    
100        public static interface MessageDestination {
101            int getMinimumMessageSize();
102            MemoryUsage getMemoryUsage();
103        }
104    
105        public abstract Message copy();
106        public abstract void clearBody() throws JMSException;
107        public abstract void storeContent();
108        public abstract void storeContentAndClear();
109    
110        // useful to reduce the memory footprint of a persisted message
111        public void clearMarshalledState() throws JMSException {
112            properties = null;
113        }
114    
115        protected void copy(Message copy) {
116            super.copy(copy);
117            copy.producerId = producerId;
118            copy.transactionId = transactionId;
119            copy.destination = destination;
120            copy.messageId = messageId != null ? messageId.copy() : null;
121            copy.originalDestination = originalDestination;
122            copy.originalTransactionId = originalTransactionId;
123            copy.expiration = expiration;
124            copy.timestamp = timestamp;
125            copy.correlationId = correlationId;
126            copy.replyTo = replyTo;
127            copy.persistent = persistent;
128            copy.redeliveryCounter = redeliveryCounter;
129            copy.type = type;
130            copy.priority = priority;
131            copy.size = size;
132            copy.groupID = groupID;
133            copy.userID = userID;
134            copy.groupSequence = groupSequence;
135    
136            if (properties != null) {
137                copy.properties = new HashMap<String, Object>(properties);
138    
139                // The new message hasn't expired, so remove this feild.
140                copy.properties.remove(ORIGINAL_EXPIRATION);
141            } else {
142                copy.properties = properties;
143            }
144    
145            copy.content = content;
146            copy.marshalledProperties = marshalledProperties;
147            copy.dataStructure = dataStructure;
148            copy.readOnlyProperties = readOnlyProperties;
149            copy.readOnlyBody = readOnlyBody;
150            copy.compressed = compressed;
151            copy.recievedByDFBridge = recievedByDFBridge;
152    
153            copy.arrival = arrival;
154            copy.connection = connection;
155            copy.regionDestination = regionDestination;
156            copy.brokerInTime = brokerInTime;
157            copy.brokerOutTime = brokerOutTime;
158            copy.memoryUsage=this.memoryUsage;
159            copy.brokerPath = brokerPath;
160            copy.jmsXGroupFirstForConsumer = jmsXGroupFirstForConsumer;
161    
162            // lets not copy the following fields
163            // copy.targetConsumerId = targetConsumerId;
164            // copy.referenceCount = referenceCount;
165        }
166    
167        public Object getProperty(String name) throws IOException {
168            if (properties == null) {
169                if (marshalledProperties == null) {
170                    return null;
171                }
172                properties = unmarsallProperties(marshalledProperties);
173            }
174            Object result = properties.get(name);
175            if (result instanceof UTF8Buffer) {
176                result = result.toString();
177            }
178    
179            return result;
180        }
181    
182        @SuppressWarnings("unchecked")
183        public Map<String, Object> getProperties() throws IOException {
184            if (properties == null) {
185                if (marshalledProperties == null) {
186                    return Collections.EMPTY_MAP;
187                }
188                properties = unmarsallProperties(marshalledProperties);
189            }
190            return Collections.unmodifiableMap(properties);
191        }
192    
193        public void clearProperties() {
194            marshalledProperties = null;
195            properties = null;
196        }
197    
198        public void setProperty(String name, Object value) throws IOException {
199            lazyCreateProperties();
200            properties.put(name, value);
201        }
202    
203        public void removeProperty(String name) throws IOException {
204            lazyCreateProperties();
205            properties.remove(name);
206        }
207    
208        protected void lazyCreateProperties() throws IOException {
209            if (properties == null) {
210                if (marshalledProperties == null) {
211                    properties = new HashMap<String, Object>();
212                } else {
213                    properties = unmarsallProperties(marshalledProperties);
214                    marshalledProperties = null;
215                }
216            }
217        }
218    
219        private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
220            return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
221        }
222    
223        @Override
224            public void beforeMarshall(WireFormat wireFormat) throws IOException {
225            // Need to marshal the properties.
226            if (marshalledProperties == null && properties != null) {
227                ByteArrayOutputStream baos = new ByteArrayOutputStream();
228                DataOutputStream os = new DataOutputStream(baos);
229                MarshallingSupport.marshalPrimitiveMap(properties, os);
230                os.close();
231                marshalledProperties = baos.toByteSequence();
232            }
233        }
234    
235        @Override
236            public void afterMarshall(WireFormat wireFormat) throws IOException {
237        }
238    
239        @Override
240            public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
241        }
242    
243        @Override
244            public void afterUnmarshall(WireFormat wireFormat) throws IOException {
245        }
246    
247        // /////////////////////////////////////////////////////////////////
248        //
249        // Simple Field accessors
250        //
251        // /////////////////////////////////////////////////////////////////
252    
253        /**
254         * @openwire:property version=1 cache=true
255         */
256        public ProducerId getProducerId() {
257            return producerId;
258        }
259    
260        public void setProducerId(ProducerId producerId) {
261            this.producerId = producerId;
262        }
263    
264        /**
265         * @openwire:property version=1 cache=true
266         */
267        public ActiveMQDestination getDestination() {
268            return destination;
269        }
270    
271        public void setDestination(ActiveMQDestination destination) {
272            this.destination = destination;
273        }
274    
275        /**
276         * @openwire:property version=1 cache=true
277         */
278        public TransactionId getTransactionId() {
279            return transactionId;
280        }
281    
282        public void setTransactionId(TransactionId transactionId) {
283            this.transactionId = transactionId;
284        }
285    
286        public boolean isInTransaction() {
287            return transactionId != null;
288        }
289    
290        /**
291         * @openwire:property version=1 cache=true
292         */
293        public ActiveMQDestination getOriginalDestination() {
294            return originalDestination;
295        }
296    
297        public void setOriginalDestination(ActiveMQDestination destination) {
298            this.originalDestination = destination;
299        }
300    
301        /**
302         * @openwire:property version=1
303         */
304        @Override
305            public MessageId getMessageId() {
306            return messageId;
307        }
308    
309        public void setMessageId(MessageId messageId) {
310            this.messageId = messageId;
311        }
312    
313        /**
314         * @openwire:property version=1 cache=true
315         */
316        public TransactionId getOriginalTransactionId() {
317            return originalTransactionId;
318        }
319    
320        public void setOriginalTransactionId(TransactionId transactionId) {
321            this.originalTransactionId = transactionId;
322        }
323    
324        /**
325         * @openwire:property version=1
326         */
327        @Override
328            public String getGroupID() {
329            return groupID;
330        }
331    
332        public void setGroupID(String groupID) {
333            this.groupID = groupID;
334        }
335    
336        /**
337         * @openwire:property version=1
338         */
339        @Override
340            public int getGroupSequence() {
341            return groupSequence;
342        }
343    
344        public void setGroupSequence(int groupSequence) {
345            this.groupSequence = groupSequence;
346        }
347    
348        /**
349         * @openwire:property version=1
350         */
351        public String getCorrelationId() {
352            return correlationId;
353        }
354    
355        public void setCorrelationId(String correlationId) {
356            this.correlationId = correlationId;
357        }
358    
359        /**
360         * @openwire:property version=1
361         */
362        @Override
363            public boolean isPersistent() {
364            return persistent;
365        }
366    
367        public void setPersistent(boolean deliveryMode) {
368            this.persistent = deliveryMode;
369        }
370    
371        /**
372         * @openwire:property version=1
373         */
374        @Override
375            public long getExpiration() {
376            return expiration;
377        }
378    
379        public void setExpiration(long expiration) {
380            this.expiration = expiration;
381        }
382    
383        /**
384         * @openwire:property version=1
385         */
386        public byte getPriority() {
387            return priority;
388        }
389    
390        public void setPriority(byte priority) {
391            if (priority < 0) {
392                this.priority = 0;
393            } else if (priority > 9) {
394                this.priority = 9;
395            } else {
396                this.priority = priority;
397            }
398        }
399    
400        /**
401         * @openwire:property version=1
402         */
403        public ActiveMQDestination getReplyTo() {
404            return replyTo;
405        }
406    
407        public void setReplyTo(ActiveMQDestination replyTo) {
408            this.replyTo = replyTo;
409        }
410    
411        /**
412         * @openwire:property version=1
413         */
414        public long getTimestamp() {
415            return timestamp;
416        }
417    
418        public void setTimestamp(long timestamp) {
419            this.timestamp = timestamp;
420        }
421    
422        /**
423         * @openwire:property version=1
424         */
425        public String getType() {
426            return type;
427        }
428    
429        public void setType(String type) {
430            this.type = type;
431        }
432    
433        /**
434         * @openwire:property version=1
435         */
436        public ByteSequence getContent() {
437            return content;
438        }
439    
440        public void setContent(ByteSequence content) {
441            this.content = content;
442        }
443    
444        /**
445         * @openwire:property version=1
446         */
447        public ByteSequence getMarshalledProperties() {
448            return marshalledProperties;
449        }
450    
451        public void setMarshalledProperties(ByteSequence marshalledProperties) {
452            this.marshalledProperties = marshalledProperties;
453        }
454    
455        /**
456         * @openwire:property version=1
457         */
458        public DataStructure getDataStructure() {
459            return dataStructure;
460        }
461    
462        public void setDataStructure(DataStructure data) {
463            this.dataStructure = data;
464        }
465    
466        /**
467         * Can be used to route the message to a specific consumer. Should be null
468         * to allow the broker use normal JMS routing semantics. If the target
469         * consumer id is an active consumer on the broker, the message is dropped.
470         * Used by the AdvisoryBroker to replay advisory messages to a specific
471         * consumer.
472         *
473         * @openwire:property version=1 cache=true
474         */
475        @Override
476            public ConsumerId getTargetConsumerId() {
477            return targetConsumerId;
478        }
479    
480        public void setTargetConsumerId(ConsumerId targetConsumerId) {
481            this.targetConsumerId = targetConsumerId;
482        }
483    
484        @Override
485            public boolean isExpired() {
486            long expireTime = getExpiration();
487            return expireTime > 0 && System.currentTimeMillis() > expireTime;
488        }
489    
490        @Override
491            public boolean isAdvisory() {
492            return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
493        }
494    
495        /**
496         * @openwire:property version=1
497         */
498        public boolean isCompressed() {
499            return compressed;
500        }
501    
502        public void setCompressed(boolean compressed) {
503            this.compressed = compressed;
504        }
505    
506        public boolean isRedelivered() {
507            return redeliveryCounter > 0;
508        }
509    
510        public void setRedelivered(boolean redelivered) {
511            if (redelivered) {
512                if (!isRedelivered()) {
513                    setRedeliveryCounter(1);
514                }
515            } else {
516                if (isRedelivered()) {
517                    setRedeliveryCounter(0);
518                }
519            }
520        }
521    
522        @Override
523            public void incrementRedeliveryCounter() {
524            redeliveryCounter++;
525        }
526    
527        /**
528         * @openwire:property version=1
529         */
530        @Override
531            public int getRedeliveryCounter() {
532            return redeliveryCounter;
533        }
534    
535        public void setRedeliveryCounter(int deliveryCounter) {
536            this.redeliveryCounter = deliveryCounter;
537        }
538    
539        /**
540         * The route of brokers the command has moved through.
541         *
542         * @openwire:property version=1 cache=true
543         */
544        public BrokerId[] getBrokerPath() {
545            return brokerPath;
546        }
547    
548        public void setBrokerPath(BrokerId[] brokerPath) {
549            this.brokerPath = brokerPath;
550        }
551    
552        public boolean isReadOnlyProperties() {
553            return readOnlyProperties;
554        }
555    
556        public void setReadOnlyProperties(boolean readOnlyProperties) {
557            this.readOnlyProperties = readOnlyProperties;
558        }
559    
560        public boolean isReadOnlyBody() {
561            return readOnlyBody;
562        }
563    
564        public void setReadOnlyBody(boolean readOnlyBody) {
565            this.readOnlyBody = readOnlyBody;
566        }
567    
568        public ActiveMQConnection getConnection() {
569            return this.connection;
570        }
571    
572        public void setConnection(ActiveMQConnection connection) {
573            this.connection = connection;
574        }
575    
576        /**
577         * Used to schedule the arrival time of a message to a broker. The broker
578         * will not dispatch a message to a consumer until it's arrival time has
579         * elapsed.
580         *
581         * @openwire:property version=1
582         */
583        public long getArrival() {
584            return arrival;
585        }
586    
587        public void setArrival(long arrival) {
588            this.arrival = arrival;
589        }
590    
591        /**
592         * Only set by the broker and defines the userID of the producer connection
593         * who sent this message. This is an optional field, it needs to be enabled
594         * on the broker to have this field populated.
595         *
596         * @openwire:property version=1
597         */
598        public String getUserID() {
599            return userID;
600        }
601    
602        public void setUserID(String jmsxUserID) {
603            this.userID = jmsxUserID;
604        }
605    
606        @Override
607            public int getReferenceCount() {
608            return referenceCount;
609        }
610    
611        @Override
612            public Message getMessageHardRef() {
613            return this;
614        }
615    
616        @Override
617            public Message getMessage() {
618            return this;
619        }
620    
621        public void setRegionDestination(MessageDestination destination) {
622            this.regionDestination = destination;
623            if(this.memoryUsage==null) {
624                this.memoryUsage=destination.getMemoryUsage();
625            }
626        }
627    
628        @Override
629            public MessageDestination getRegionDestination() {
630            return regionDestination;
631        }
632    
633        public MemoryUsage getMemoryUsage() {
634            return this.memoryUsage;
635        }
636    
637        public void setMemoryUsage(MemoryUsage usage) {
638            this.memoryUsage=usage;
639        }
640    
641        @Override
642        public boolean isMarshallAware() {
643            return true;
644        }
645    
646        @Override
647            public int incrementReferenceCount() {
648            int rc;
649            int size;
650            synchronized (this) {
651                rc = ++referenceCount;
652                size = getSize();
653            }
654    
655            if (rc == 1 && getMemoryUsage() != null) {
656                getMemoryUsage().increaseUsage(size);
657                //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
658    
659            }
660    
661            //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
662            return rc;
663        }
664    
665        @Override
666            public int decrementReferenceCount() {
667            int rc;
668            int size;
669            synchronized (this) {
670                rc = --referenceCount;
671                size = getSize();
672            }
673    
674            if (rc == 0 && getMemoryUsage() != null) {
675                getMemoryUsage().decreaseUsage(size);
676                //Thread.dumpStack();
677                //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
678            }
679    
680            //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
681    
682            return rc;
683        }
684    
685        @Override
686            public int getSize() {
687            int minimumMessageSize = getMinimumMessageSize();
688            if (size < minimumMessageSize || size == 0) {
689                size = minimumMessageSize;
690                if (marshalledProperties != null) {
691                    size += marshalledProperties.getLength();
692                }
693                if (content != null) {
694                    size += content.getLength();
695                }
696            }
697            return size;
698        }
699    
700        protected int getMinimumMessageSize() {
701            int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
702            //let destination override
703            MessageDestination dest = regionDestination;
704            if (dest != null) {
705                result=dest.getMinimumMessageSize();
706            }
707            return result;
708        }
709    
710        /**
711         * @openwire:property version=1
712         * @return Returns the recievedByDFBridge.
713         */
714        public boolean isRecievedByDFBridge() {
715            return recievedByDFBridge;
716        }
717    
718        /**
719         * @param recievedByDFBridge The recievedByDFBridge to set.
720         */
721        public void setRecievedByDFBridge(boolean recievedByDFBridge) {
722            this.recievedByDFBridge = recievedByDFBridge;
723        }
724    
725        public void onMessageRolledBack() {
726            incrementRedeliveryCounter();
727        }
728    
729        /**
730         * @openwire:property version=2 cache=true
731         */
732        public boolean isDroppable() {
733            return droppable;
734        }
735    
736        public void setDroppable(boolean droppable) {
737            this.droppable = droppable;
738        }
739    
740        /**
741         * If a message is stored in multiple nodes on a cluster, all the cluster
742         * members will be listed here. Otherwise, it will be null.
743         *
744         * @openwire:property version=3 cache=true
745         */
746        public BrokerId[] getCluster() {
747            return cluster;
748        }
749    
750        public void setCluster(BrokerId[] cluster) {
751            this.cluster = cluster;
752        }
753    
754        @Override
755        public boolean isMessage() {
756            return true;
757        }
758    
759        /**
760         * @openwire:property version=3
761         */
762        public long getBrokerInTime() {
763            return this.brokerInTime;
764        }
765    
766        public void setBrokerInTime(long brokerInTime) {
767            this.brokerInTime = brokerInTime;
768        }
769    
770        /**
771         * @openwire:property version=3
772         */
773        public long getBrokerOutTime() {
774            return this.brokerOutTime;
775        }
776    
777        public void setBrokerOutTime(long brokerOutTime) {
778            this.brokerOutTime = brokerOutTime;
779        }
780    
781        @Override
782            public boolean isDropped() {
783            return false;
784        }
785    
786        /**
787         * @openwire:property version=10
788         */
789        public boolean isJMSXGroupFirstForConsumer() {
790            return jmsXGroupFirstForConsumer;
791        }
792    
793        public void setJMSXGroupFirstForConsumer(boolean val) {
794            jmsXGroupFirstForConsumer = val;
795        }
796    
797        public void compress() throws IOException {
798            if (!isCompressed()) {
799                storeContent();
800                if (!isCompressed() && getContent() != null) {
801                    doCompress();
802                }
803            }
804        }
805    
806        protected void doCompress() throws IOException {
807            compressed = true;
808            ByteSequence bytes = getContent();
809            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
810            OutputStream os = new DeflaterOutputStream(bytesOut);
811            os.write(bytes.data, bytes.offset, bytes.length);
812            os.close();
813            setContent(bytesOut.toByteSequence());
814        }
815    
816        @Override
817        public String toString() {
818            return toString(null);
819        }
820    
821        @Override
822        public String toString(Map<String, Object>overrideFields) {
823            try {
824                getProperties();
825            } catch (IOException e) {
826            }
827            return super.toString(overrideFields);
828        }
829    }