001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region;
018    
019    import org.apache.activemq.command.ConsumerId;
020    import org.apache.activemq.command.Message;
021    import org.apache.activemq.command.MessageId;
022    
023    /**
024     * Keeps track of a message that is flowing through the Broker. This object may
025     * hold a hard reference to the message or only hold the id of the message if
026     * the message has been persisted on in a MessageStore.
027     * 
028     * 
029     */
030    public class IndirectMessageReference implements QueueMessageReference {
031    
032        /** The subscription that has locked the message */
033        private LockOwner lockOwner;
034        /** Has the message been dropped? */
035        private boolean dropped;
036        /** Has the message been acked? */
037        private boolean acked;
038        /** Direct reference to the message */
039        private final Message message;
040        private final MessageId messageId;
041        
042        /**
043         * @param message
044         */
045        public IndirectMessageReference(final Message message) {
046            this.message = message;
047            this.messageId = message.getMessageId().copy();
048            message.getMessageId();
049            message.getGroupID();
050            message.getGroupSequence();
051        }
052    
053        public Message getMessageHardRef() {
054            return message;
055        }
056    
057        public int getReferenceCount() {
058            return message.getReferenceCount();
059        }
060    
061        public int incrementReferenceCount() {
062            return message.incrementReferenceCount();
063        }
064    
065        public int decrementReferenceCount() {
066            return message.decrementReferenceCount();
067        }
068    
069        public Message getMessage() {
070            return message;
071        }
072    
073        public String toString() {
074            return "Message " + message.getMessageId() + " dropped=" + dropped + " acked=" + acked + " locked=" + (lockOwner != null);
075        }
076    
077        public void incrementRedeliveryCounter() {
078            message.incrementRedeliveryCounter();
079        }
080    
081        public synchronized boolean isDropped() {
082            return dropped;
083        }
084    
085        public synchronized void drop() {
086            dropped = true;
087            lockOwner = null;
088            message.decrementReferenceCount();
089        }
090    
091        public boolean lock(LockOwner subscription) {
092            synchronized (this) {
093                if (dropped || lockOwner != null) {
094                    return false;
095                }
096                lockOwner = subscription;
097                return true;
098            }
099        }
100    
101        public synchronized boolean unlock() {
102            boolean result = lockOwner != null;
103            lockOwner = null;
104            return result;
105        }
106    
107        public synchronized LockOwner getLockOwner() {
108            return lockOwner;
109        }
110    
111        public int getRedeliveryCounter() {
112            return message.getRedeliveryCounter();
113        }
114    
115        public MessageId getMessageId() {
116            return messageId;
117        }
118    
119        public Message.MessageDestination getRegionDestination() {
120            return message.getRegionDestination();
121        }
122    
123        public boolean isPersistent() {
124            return message.isPersistent();
125        }
126    
127        public synchronized boolean isLocked() {
128            return lockOwner != null;
129        }
130    
131        public synchronized boolean isAcked() {
132            return acked;
133        }
134    
135        public synchronized void setAcked(boolean b) {
136            acked = b;
137        }
138    
139        public String getGroupID() {
140            return message.getGroupID();
141        }
142    
143        public int getGroupSequence() {
144            return message.getGroupSequence();
145        }
146    
147        public ConsumerId getTargetConsumerId() {
148            return message.getTargetConsumerId();
149        }
150    
151        public long getExpiration() {
152            return message.getExpiration();
153        }
154    
155        public boolean isExpired() {
156            return message.isExpired();
157        }
158    
159        public synchronized int getSize() {
160           return message.getSize();
161        }
162    
163        public boolean isAdvisory() {
164           return message.isAdvisory();
165        }
166    }