001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.journal;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Collections;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.LinkedHashMap;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Set;
028    
029    import org.apache.activeio.journal.RecordLocation;
030    import org.apache.activemq.broker.ConnectionContext;
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.JournalQueueAck;
033    import org.apache.activemq.command.Message;
034    import org.apache.activemq.command.MessageAck;
035    import org.apache.activemq.command.MessageId;
036    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
037    import org.apache.activemq.store.MessageRecoveryListener;
038    import org.apache.activemq.store.MessageStore;
039    import org.apache.activemq.store.PersistenceAdapter;
040    import org.apache.activemq.store.AbstractMessageStore;
041    import org.apache.activemq.transaction.Synchronization;
042    import org.apache.activemq.usage.MemoryUsage;
043    import org.apache.activemq.util.Callback;
044    import org.apache.activemq.util.TransactionTemplate;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    /**
049     * A MessageStore that uses a Journal to store it's messages.
050     * 
051     * 
052     */
053    public class JournalMessageStore extends AbstractMessageStore {
054    
055        private static final Logger LOG = LoggerFactory.getLogger(JournalMessageStore.class);
056    
057        protected final JournalPersistenceAdapter peristenceAdapter;
058        protected final JournalTransactionStore transactionStore;
059        protected final MessageStore longTermStore;
060        protected final TransactionTemplate transactionTemplate;
061        protected RecordLocation lastLocation;
062        protected Set<RecordLocation> inFlightTxLocations = new HashSet<RecordLocation>();
063    
064        private Map<MessageId, Message> messages = new LinkedHashMap<MessageId, Message>();
065        private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
066    
067        /** A MessageStore that we can use to retrieve messages quickly. */
068        private Map<MessageId, Message> cpAddedMessageIds;
069    
070    
071        private MemoryUsage memoryUsage;
072    
073        public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
074            super(destination);
075            this.peristenceAdapter = adapter;
076            this.transactionStore = adapter.getTransactionStore();
077            this.longTermStore = checkpointStore;
078            this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
079        }
080    
081        
082        public void setMemoryUsage(MemoryUsage memoryUsage) {
083            this.memoryUsage=memoryUsage;
084            longTermStore.setMemoryUsage(memoryUsage);
085        }
086    
087        /**
088         * Not synchronized since the Journal has better throughput if you increase
089         * the number of concurrent writes that it is doing.
090         */
091        public void addMessage(ConnectionContext context, final Message message) throws IOException {
092    
093            final MessageId id = message.getMessageId();
094    
095            final boolean debug = LOG.isDebugEnabled();
096            message.incrementReferenceCount();
097    
098            final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
099            if (!context.isInTransaction()) {
100                if (debug) {
101                    LOG.debug("Journalled message add for: " + id + ", at: " + location);
102                }
103                addMessage(message, location);
104            } else {
105                if (debug) {
106                    LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
107                }
108                synchronized (this) {
109                    inFlightTxLocations.add(location);
110                }
111                transactionStore.addMessage(this, message, location);
112                context.getTransaction().addSynchronization(new Synchronization() {
113                    public void afterCommit() throws Exception {
114                        if (debug) {
115                            LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
116                        }
117                        synchronized (JournalMessageStore.this) {
118                            inFlightTxLocations.remove(location);
119                            addMessage(message, location);
120                        }
121                    }
122    
123                    public void afterRollback() throws Exception {
124                        if (debug) {
125                            LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
126                        }
127                        synchronized (JournalMessageStore.this) {
128                            inFlightTxLocations.remove(location);
129                        }
130                        message.decrementReferenceCount();
131                    }
132                });
133            }
134        }
135    
136        void addMessage(final Message message, final RecordLocation location) {
137            synchronized (this) {
138                lastLocation = location;
139                MessageId id = message.getMessageId();
140                messages.put(id, message);
141            }
142        }
143    
144        public void replayAddMessage(ConnectionContext context, Message message) {
145            try {
146                // Only add the message if it has not already been added.
147                Message t = longTermStore.getMessage(message.getMessageId());
148                if (t == null) {
149                    longTermStore.addMessage(context, message);
150                }
151            } catch (Throwable e) {
152                LOG.warn("Could not replay add for message '" + message.getMessageId() + "'.  Message may have already been added. reason: " + e);
153            }
154        }
155    
156        /**
157         */
158        public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
159            final boolean debug = LOG.isDebugEnabled();
160            JournalQueueAck remove = new JournalQueueAck();
161            remove.setDestination(destination);
162            remove.setMessageAck(ack);
163    
164            final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
165            if (!context.isInTransaction()) {
166                if (debug) {
167                    LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
168                }
169                removeMessage(ack, location);
170            } else {
171                if (debug) {
172                    LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
173                }
174                synchronized (this) {
175                    inFlightTxLocations.add(location);
176                }
177                transactionStore.removeMessage(this, ack, location);
178                context.getTransaction().addSynchronization(new Synchronization() {
179                    public void afterCommit() throws Exception {
180                        if (debug) {
181                            LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
182                        }
183                        synchronized (JournalMessageStore.this) {
184                            inFlightTxLocations.remove(location);
185                            removeMessage(ack, location);
186                        }
187                    }
188    
189                    public void afterRollback() throws Exception {
190                        if (debug) {
191                            LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
192                        }
193                        synchronized (JournalMessageStore.this) {
194                            inFlightTxLocations.remove(location);
195                        }
196                    }
197                });
198    
199            }
200        }
201    
202        final void removeMessage(final MessageAck ack, final RecordLocation location) {
203            synchronized (this) {
204                lastLocation = location;
205                MessageId id = ack.getLastMessageId();
206                Message message = messages.remove(id);
207                if (message == null) {
208                    messageAcks.add(ack);
209                } else {
210                    message.decrementReferenceCount();
211                }
212            }
213        }
214    
215        public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
216            try {
217                // Only remove the message if it has not already been removed.
218                Message t = longTermStore.getMessage(messageAck.getLastMessageId());
219                if (t != null) {
220                    longTermStore.removeMessage(context, messageAck);
221                }
222            } catch (Throwable e) {
223                LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + e);
224            }
225        }
226    
227        /**
228         * @return
229         * @throws IOException
230         */
231        public RecordLocation checkpoint() throws IOException {
232            return checkpoint(null);
233        }
234    
235        /**
236         * @return
237         * @throws IOException
238         */
239        @SuppressWarnings("unchecked")
240        public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException {
241    
242            final List<MessageAck> cpRemovedMessageLocations;
243            final List<RecordLocation> cpActiveJournalLocations;
244            final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
245    
246            // swap out the message hash maps..
247            synchronized (this) {
248                cpAddedMessageIds = this.messages;
249                cpRemovedMessageLocations = this.messageAcks;
250    
251                cpActiveJournalLocations = new ArrayList<RecordLocation>(inFlightTxLocations);
252    
253                this.messages = new LinkedHashMap<MessageId, Message>();
254                this.messageAcks = new ArrayList<MessageAck>();
255            }
256    
257            transactionTemplate.run(new Callback() {
258                public void execute() throws Exception {
259    
260                    int size = 0;
261    
262                    PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
263                    ConnectionContext context = transactionTemplate.getContext();
264    
265                    // Checkpoint the added messages.
266                    synchronized (JournalMessageStore.this) {
267                        Iterator<Message> iterator = cpAddedMessageIds.values().iterator();
268                        while (iterator.hasNext()) {
269                            Message message = iterator.next();
270                            try {
271                                longTermStore.addMessage(context, message);
272                            } catch (Throwable e) {
273                                LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
274                            }
275                            size += message.getSize();
276                            message.decrementReferenceCount();
277                            // Commit the batch if it's getting too big
278                            if (size >= maxCheckpointMessageAddSize) {
279                                persitanceAdapter.commitTransaction(context);
280                                persitanceAdapter.beginTransaction(context);
281                                size = 0;
282                            }
283                        }
284                    }
285    
286                    persitanceAdapter.commitTransaction(context);
287                    persitanceAdapter.beginTransaction(context);
288    
289                    // Checkpoint the removed messages.
290                    Iterator<MessageAck> iterator = cpRemovedMessageLocations.iterator();
291                    while (iterator.hasNext()) {
292                        try {
293                            MessageAck ack = iterator.next();
294                            longTermStore.removeMessage(transactionTemplate.getContext(), ack);
295                        } catch (Throwable e) {
296                            LOG.debug("Message could not be removed from long term store: " + e.getMessage(), e);
297                        }
298                    }
299    
300                    if (postCheckpointTest != null) {
301                        postCheckpointTest.execute();
302                    }
303                }
304    
305            });
306    
307            synchronized (this) {
308                cpAddedMessageIds = null;
309            }
310    
311            if (cpActiveJournalLocations.size() > 0) {
312                Collections.sort(cpActiveJournalLocations);
313                return cpActiveJournalLocations.get(0);
314            }
315            synchronized (this) {
316                return lastLocation;
317            }
318        }
319    
320        /**
321         * 
322         */
323        public Message getMessage(MessageId identity) throws IOException {
324            Message answer = null;
325    
326            synchronized (this) {
327                // Do we have a still have it in the journal?
328                answer = messages.get(identity);
329                if (answer == null && cpAddedMessageIds != null) {
330                    answer = cpAddedMessageIds.get(identity);
331                }
332            }
333    
334            if (answer != null) {
335                return answer;
336            }
337    
338            // If all else fails try the long term message store.
339            return longTermStore.getMessage(identity);
340        }
341    
342        /**
343         * Replays the checkpointStore first as those messages are the oldest ones,
344         * then messages are replayed from the transaction log and then the cache is
345         * updated.
346         * 
347         * @param listener
348         * @throws Exception
349         */
350        public void recover(final MessageRecoveryListener listener) throws Exception {
351            peristenceAdapter.checkpoint(true, true);
352            longTermStore.recover(listener);
353        }
354    
355        public void start() throws Exception {
356            if (this.memoryUsage != null) {
357                this.memoryUsage.addUsageListener(peristenceAdapter);
358            }
359            longTermStore.start();
360        }
361    
362        public void stop() throws Exception {
363            longTermStore.stop();
364            if (this.memoryUsage != null) {
365                this.memoryUsage.removeUsageListener(peristenceAdapter);
366            }
367        }
368    
369        /**
370         * @return Returns the longTermStore.
371         */
372        public MessageStore getLongTermMessageStore() {
373            return longTermStore;
374        }
375    
376        /**
377         * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
378         */
379        public void removeAllMessages(ConnectionContext context) throws IOException {
380            peristenceAdapter.checkpoint(true, true);
381            longTermStore.removeAllMessages(context);
382        }
383    
384        public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
385            throw new IOException("The journal does not support message references.");
386        }
387    
388        public String getMessageReference(MessageId identity) throws IOException {
389            throw new IOException("The journal does not support message references.");
390        }
391    
392        /**
393         * @return
394         * @throws IOException
395         * @see org.apache.activemq.store.MessageStore#getMessageCount()
396         */
397        public int getMessageCount() throws IOException {
398            peristenceAdapter.checkpoint(true, true);
399            return longTermStore.getMessageCount();
400        }
401    
402        public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
403            peristenceAdapter.checkpoint(true, true);
404            longTermStore.recoverNextMessages(maxReturned, listener);
405    
406        }
407    
408        public void resetBatching() {
409            longTermStore.resetBatching();
410    
411        }
412    
413        @Override
414        public void setBatch(MessageId messageId) throws Exception {
415            peristenceAdapter.checkpoint(true, true);
416            longTermStore.setBatch(messageId);
417        }
418    
419    }