001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.journal;
018    
019    import java.io.IOException;
020    import java.util.HashMap;
021    import java.util.Iterator;
022    
023    import org.apache.activeio.journal.RecordLocation;
024    import org.apache.activemq.broker.ConnectionContext;
025    import org.apache.activemq.command.ActiveMQTopic;
026    import org.apache.activemq.command.JournalTopicAck;
027    import org.apache.activemq.command.Message;
028    import org.apache.activemq.command.MessageAck;
029    import org.apache.activemq.command.MessageId;
030    import org.apache.activemq.command.SubscriptionInfo;
031    import org.apache.activemq.store.MessageRecoveryListener;
032    import org.apache.activemq.store.TopicMessageStore;
033    import org.apache.activemq.transaction.Synchronization;
034    import org.apache.activemq.util.Callback;
035    import org.apache.activemq.util.SubscriptionKey;
036    import org.slf4j.Logger;
037    import org.slf4j.LoggerFactory;
038    
039    /**
040     * A MessageStore that uses a Journal to store it's messages.
041     * 
042     * 
043     */
044    public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore {
045    
046        private static final Logger LOG = LoggerFactory.getLogger(JournalTopicMessageStore.class);
047    
048        private TopicMessageStore longTermStore;
049        private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
050    
051        public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore,
052                                        ActiveMQTopic destinationName) {
053            super(adapter, checkpointStore, destinationName);
054            this.longTermStore = checkpointStore;
055        }
056    
057        public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener)
058            throws Exception {
059            this.peristenceAdapter.checkpoint(true, true);
060            longTermStore.recoverSubscription(clientId, subscriptionName, listener);
061        }
062    
063        public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
064                                        MessageRecoveryListener listener) throws Exception {
065            this.peristenceAdapter.checkpoint(true, true);
066            longTermStore.recoverNextMessages(clientId, subscriptionName, maxReturned, listener);
067    
068        }
069    
070        public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
071            return longTermStore.lookupSubscription(clientId, subscriptionName);
072        }
073    
074        public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
075            this.peristenceAdapter.checkpoint(true, true);
076            longTermStore.addSubsciption(subscriptionInfo, retroactive);
077        }
078    
079        public void addMessage(ConnectionContext context, Message message) throws IOException {
080            super.addMessage(context, message);
081        }
082    
083        /**
084         */
085        public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
086                                final MessageId messageId, MessageAck originalAck) throws IOException {
087            final boolean debug = LOG.isDebugEnabled();
088    
089            JournalTopicAck ack = new JournalTopicAck();
090            ack.setDestination(destination);
091            ack.setMessageId(messageId);
092            ack.setMessageSequenceId(messageId.getBrokerSequenceId());
093            ack.setSubscritionName(subscriptionName);
094            ack.setClientId(clientId);
095            ack.setTransactionId(context.getTransaction() != null
096                ? context.getTransaction().getTransactionId() : null);
097            final RecordLocation location = peristenceAdapter.writeCommand(ack, false);
098    
099            final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
100            if (!context.isInTransaction()) {
101                if (debug) {
102                    LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
103                }
104                acknowledge(messageId, location, key);
105            } else {
106                if (debug) {
107                    LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
108                }
109                synchronized (this) {
110                    inFlightTxLocations.add(location);
111                }
112                transactionStore.acknowledge(this, ack, location);
113                context.getTransaction().addSynchronization(new Synchronization() {
114                    public void afterCommit() throws Exception {
115                        if (debug) {
116                            LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
117                        }
118                        synchronized (JournalTopicMessageStore.this) {
119                            inFlightTxLocations.remove(location);
120                            acknowledge(messageId, location, key);
121                        }
122                    }
123    
124                    public void afterRollback() throws Exception {
125                        if (debug) {
126                            LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
127                        }
128                        synchronized (JournalTopicMessageStore.this) {
129                            inFlightTxLocations.remove(location);
130                        }
131                    }
132                });
133            }
134    
135        }
136    
137        public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName,
138                                      MessageId messageId) {
139            try {
140                SubscriptionInfo sub = longTermStore.lookupSubscription(clientId, subscritionName);
141                if (sub != null) {
142                    longTermStore.acknowledge(context, clientId, subscritionName, messageId, null);
143                }
144            } catch (Throwable e) {
145                LOG.debug("Could not replay acknowledge for message '" + messageId
146                          + "'.  Message may have already been acknowledged. reason: " + e);
147            }
148        }
149    
150        /**
151         * @param messageId
152         * @param location
153         * @param key
154         */
155        protected void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
156            synchronized (this) {
157                lastLocation = location;
158                ackedLastAckLocations.put(key, messageId);
159            }
160        }
161    
162        public RecordLocation checkpoint() throws IOException {
163    
164            final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
165    
166            // swap out the hash maps..
167            synchronized (this) {
168                cpAckedLastAckLocations = this.ackedLastAckLocations;
169                this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
170            }
171    
172            return super.checkpoint(new Callback() {
173                public void execute() throws Exception {
174    
175                    // Checkpoint the acknowledged messages.
176                    Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
177                    while (iterator.hasNext()) {
178                        SubscriptionKey subscriptionKey = iterator.next();
179                        MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
180                        longTermStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
181                                                  subscriptionKey.subscriptionName, identity, null);
182                    }
183    
184                }
185            });
186    
187        }
188    
189        /**
190         * @return Returns the longTermStore.
191         */
192        public TopicMessageStore getLongTermTopicMessageStore() {
193            return longTermStore;
194        }
195    
196        public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
197            longTermStore.deleteSubscription(clientId, subscriptionName);
198        }
199    
200        public SubscriptionInfo[] getAllSubscriptions() throws IOException {
201            return longTermStore.getAllSubscriptions();
202        }
203    
204        public int getMessageCount(String clientId, String subscriberName) throws IOException {
205            this.peristenceAdapter.checkpoint(true, true);
206            return longTermStore.getMessageCount(clientId, subscriberName);
207        }
208    
209        public void resetBatching(String clientId, String subscriptionName) {
210            longTermStore.resetBatching(clientId, subscriptionName);
211        }
212    
213    }