001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.amq;
018    
019    import java.io.IOException;
020    
021    import org.apache.activemq.broker.ConnectionContext;
022    import org.apache.activemq.command.ActiveMQTopic;
023    import org.apache.activemq.command.JournalTopicAck;
024    import org.apache.activemq.command.Message;
025    import org.apache.activemq.command.MessageAck;
026    import org.apache.activemq.command.MessageId;
027    import org.apache.activemq.command.SubscriptionInfo;
028    import org.apache.activemq.filter.BooleanExpression;
029    import org.apache.activemq.filter.MessageEvaluationContext;
030    import org.apache.activemq.kaha.impl.async.Location;
031    import org.apache.activemq.selector.SelectorParser;
032    import org.apache.activemq.store.MessageRecoveryListener;
033    import org.apache.activemq.store.TopicMessageStore;
034    import org.apache.activemq.store.TopicReferenceStore;
035    import org.apache.activemq.transaction.Synchronization;
036    import org.apache.activemq.util.IOExceptionSupport;
037    import org.apache.activemq.util.SubscriptionKey;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    /**
042     * A MessageStore that uses a Journal to store it's messages.
043     * 
044     * 
045     */
046    public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore {
047    
048        private static final Logger LOG = LoggerFactory.getLogger(AMQTopicMessageStore.class);
049        private TopicReferenceStore topicReferenceStore;
050        public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
051            super(adapter, topicReferenceStore, destinationName);
052            this.topicReferenceStore = topicReferenceStore;
053        }
054    
055        public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
056            flush();
057            topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener));
058        }
059    
060        public void recoverNextMessages(String clientId, String subscriptionName,
061                int maxReturned, final MessageRecoveryListener listener)
062                throws Exception {
063            RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
064                topicReferenceStore.recoverNextMessages(clientId, subscriptionName,maxReturned, recoveryListener);
065                if (recoveryListener.size() == 0) {
066                    flush();
067                    topicReferenceStore.recoverNextMessages(clientId,subscriptionName, maxReturned, recoveryListener);
068                }
069        }
070    
071        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
072            return topicReferenceStore.lookupSubscription(clientId, subscriptionName);
073        }
074    
075        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
076            peristenceAdapter.writeCommand(subscriptionInfo, false);
077            topicReferenceStore.addSubsciption(subscriptionInfo, retroactive);
078        }
079    
080        /**
081         */
082        public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName,
083                                final MessageId messageId, final MessageAck originalAck) throws IOException {
084            final boolean debug = LOG.isDebugEnabled();
085            JournalTopicAck ack = new JournalTopicAck();
086            ack.setDestination(destination);
087            ack.setMessageId(messageId);
088            ack.setMessageSequenceId(messageId.getBrokerSequenceId());
089            ack.setSubscritionName(subscriptionName);
090            ack.setClientId(clientId);
091            ack.setTransactionId(context.getTransaction() != null ? context.getTransaction().getTransactionId() : null);
092            final Location location = peristenceAdapter.writeCommand(ack, false);
093            final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
094            if (!context.isInTransaction()) {
095                if (debug) {
096                    LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
097                }
098                acknowledge(context,messageId, location, clientId,subscriptionName);
099            } else {
100                if (debug) {
101                    LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
102                }
103                lock.lock();
104                try {
105                    inFlightTxLocations.add(location);
106                }finally {
107                    lock.unlock();
108                }
109                transactionStore.acknowledge(this, ack, location);
110                context.getTransaction().addSynchronization(new Synchronization() {
111    
112                    public void afterCommit() throws Exception {
113                        if (debug) {
114                            LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
115                        }
116                        lock.lock();
117                        try {
118                            inFlightTxLocations.remove(location);
119                            acknowledge(context,messageId, location, clientId,subscriptionName);
120                        }finally {
121                            lock.unlock();
122                        }
123                    }
124    
125                    public void afterRollback() throws Exception {
126                        if (debug) {
127                            LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
128                        }
129                        lock.lock();
130                        try{
131                            inFlightTxLocations.remove(location);
132                        }finally {
133                            lock.unlock();
134                        }
135                    }
136                });
137            }
138        }
139    
140        public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
141            try {
142                SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName);
143                if (sub != null) {
144                    topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId, null);
145                    return true;
146                }
147            } catch (Throwable e) {
148                LOG.debug("Could not replay acknowledge for message '" + messageId + "'.  Message may have already been acknowledged. reason: " + e);
149            }
150            return false;
151        }
152    
153        /**
154         * @param messageId
155         * @param location
156         * @param key
157         * @throws IOException 
158         */
159        protected void acknowledge(final ConnectionContext context, MessageId messageId,
160                Location location, String clientId, String subscriptionName)
161                throws IOException {
162            MessageAck ack = null;
163            lock.lock();
164            try {
165                lastLocation = location;
166            }finally {
167                lock.unlock();
168            }
169            
170                if (topicReferenceStore.acknowledgeReference(context, clientId,
171                        subscriptionName, messageId)) {
172                    ack = new MessageAck();
173                    ack.setLastMessageId(messageId);
174                   
175                }
176            
177            if (ack != null) {
178                removeMessage(context, ack);
179            }
180        }
181    
182        /**
183         * @return Returns the longTermStore.
184         */
185        public TopicReferenceStore getTopicReferenceStore() {
186            return topicReferenceStore;
187        }
188    
189        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
190            topicReferenceStore.deleteSubscription(clientId, subscriptionName);
191        }
192    
193        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
194            return topicReferenceStore.getAllSubscriptions();
195        }
196    
197        public int getMessageCount(String clientId, String subscriberName) throws IOException {
198            flush();
199            SubscriptionInfo info = lookupSubscription(clientId, subscriberName);
200            try {
201                MessageCounter counter = new MessageCounter(info, this);
202                topicReferenceStore.recoverSubscription(clientId, subscriberName, counter);
203                return counter.count;
204            } catch (Exception e) {
205                throw IOExceptionSupport.create(e);
206            }
207        }
208        
209        private class MessageCounter implements MessageRecoveryListener {
210            
211            int count = 0;
212            SubscriptionInfo info;
213            BooleanExpression selectorExpression;
214            TopicMessageStore store;
215            
216            public MessageCounter(SubscriptionInfo info, TopicMessageStore store) throws Exception {
217                this.info = info;
218                if (info != null) {
219                    String selector = info.getSelector();
220                    if (selector != null) {
221                        this.selectorExpression = SelectorParser.parse(selector);
222                    }
223                }
224                this.store = store;
225            }
226    
227            public boolean recoverMessageReference(MessageId ref) throws Exception {
228                if (selectorExpression != null) {
229                    MessageEvaluationContext ctx = new MessageEvaluationContext();
230                    ctx.setMessageReference(store.getMessage(ref));
231                    if (selectorExpression.matches(ctx)) {
232                        count++;
233                    }
234                } else {
235                    count ++;
236                }
237                return true;
238            }
239    
240            public boolean recoverMessage(Message message) throws Exception {
241                if (selectorExpression != null) {
242                    MessageEvaluationContext ctx = new MessageEvaluationContext();
243                    ctx.setMessageReference(store.getMessage(message.getMessageId()));
244                    if (selectorExpression.matches(ctx)) {
245                        count++;
246                    }
247                } else {
248                    count++;
249                }
250                return true;
251            }
252    
253            public boolean isDuplicate(MessageId ref) {
254                return false;
255            }
256    
257            public boolean hasSpace() {
258                return true;
259            }
260        }
261    
262        public void resetBatching(String clientId, String subscriptionName) {
263            topicReferenceStore.resetBatching(clientId, subscriptionName);
264        }
265    }