001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.journal;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Collections;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.LinkedHashMap;
025    import java.util.List;
026    import java.util.Map;
027    import java.util.Set;
028    
029    import org.apache.activeio.journal.RecordLocation;
030    import org.apache.activemq.broker.ConnectionContext;
031    import org.apache.activemq.command.ActiveMQDestination;
032    import org.apache.activemq.command.JournalQueueAck;
033    import org.apache.activemq.command.Message;
034    import org.apache.activemq.command.MessageAck;
035    import org.apache.activemq.command.MessageId;
036    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
037    import org.apache.activemq.store.MessageRecoveryListener;
038    import org.apache.activemq.store.MessageStore;
039    import org.apache.activemq.store.PersistenceAdapter;
040    import org.apache.activemq.store.AbstractMessageStore;
041    import org.apache.activemq.transaction.Synchronization;
042    import org.apache.activemq.usage.MemoryUsage;
043    import org.apache.activemq.usage.SystemUsage;
044    import org.apache.activemq.util.Callback;
045    import org.apache.activemq.util.TransactionTemplate;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * A MessageStore that uses a Journal to store it's messages.
051     * 
052     * 
053     */
054    public class JournalMessageStore extends AbstractMessageStore {
055    
056        private static final Logger LOG = LoggerFactory.getLogger(JournalMessageStore.class);
057    
058        protected final JournalPersistenceAdapter peristenceAdapter;
059        protected final JournalTransactionStore transactionStore;
060        protected final MessageStore longTermStore;
061        protected final TransactionTemplate transactionTemplate;
062        protected RecordLocation lastLocation;
063        protected Set<RecordLocation> inFlightTxLocations = new HashSet<RecordLocation>();
064    
065        private Map<MessageId, Message> messages = new LinkedHashMap<MessageId, Message>();
066        private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
067    
068        /** A MessageStore that we can use to retrieve messages quickly. */
069        private Map<MessageId, Message> cpAddedMessageIds;
070    
071    
072        private MemoryUsage memoryUsage;
073    
074        public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
075            super(destination);
076            this.peristenceAdapter = adapter;
077            this.transactionStore = adapter.getTransactionStore();
078            this.longTermStore = checkpointStore;
079            this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
080        }
081    
082        
083        public void setMemoryUsage(MemoryUsage memoryUsage) {
084            this.memoryUsage=memoryUsage;
085            longTermStore.setMemoryUsage(memoryUsage);
086        }
087    
088        /**
089         * Not synchronized since the Journal has better throughput if you increase
090         * the number of concurrent writes that it is doing.
091         */
092        public void addMessage(ConnectionContext context, final Message message) throws IOException {
093    
094            final MessageId id = message.getMessageId();
095    
096            final boolean debug = LOG.isDebugEnabled();
097            message.incrementReferenceCount();
098    
099            final RecordLocation location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
100            if (!context.isInTransaction()) {
101                if (debug) {
102                    LOG.debug("Journalled message add for: " + id + ", at: " + location);
103                }
104                addMessage(message, location);
105            } else {
106                if (debug) {
107                    LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
108                }
109                synchronized (this) {
110                    inFlightTxLocations.add(location);
111                }
112                transactionStore.addMessage(this, message, location);
113                context.getTransaction().addSynchronization(new Synchronization() {
114                    public void afterCommit() throws Exception {
115                        if (debug) {
116                            LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
117                        }
118                        synchronized (JournalMessageStore.this) {
119                            inFlightTxLocations.remove(location);
120                            addMessage(message, location);
121                        }
122                    }
123    
124                    public void afterRollback() throws Exception {
125                        if (debug) {
126                            LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
127                        }
128                        synchronized (JournalMessageStore.this) {
129                            inFlightTxLocations.remove(location);
130                        }
131                        message.decrementReferenceCount();
132                    }
133                });
134            }
135        }
136    
137        void addMessage(final Message message, final RecordLocation location) {
138            synchronized (this) {
139                lastLocation = location;
140                MessageId id = message.getMessageId();
141                messages.put(id, message);
142            }
143        }
144    
145        public void replayAddMessage(ConnectionContext context, Message message) {
146            try {
147                // Only add the message if it has not already been added.
148                Message t = longTermStore.getMessage(message.getMessageId());
149                if (t == null) {
150                    longTermStore.addMessage(context, message);
151                }
152            } catch (Throwable e) {
153                LOG.warn("Could not replay add for message '" + message.getMessageId() + "'.  Message may have already been added. reason: " + e);
154            }
155        }
156    
157        /**
158         */
159        public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
160            final boolean debug = LOG.isDebugEnabled();
161            JournalQueueAck remove = new JournalQueueAck();
162            remove.setDestination(destination);
163            remove.setMessageAck(ack);
164    
165            final RecordLocation location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
166            if (!context.isInTransaction()) {
167                if (debug) {
168                    LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
169                }
170                removeMessage(ack, location);
171            } else {
172                if (debug) {
173                    LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
174                }
175                synchronized (this) {
176                    inFlightTxLocations.add(location);
177                }
178                transactionStore.removeMessage(this, ack, location);
179                context.getTransaction().addSynchronization(new Synchronization() {
180                    public void afterCommit() throws Exception {
181                        if (debug) {
182                            LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
183                        }
184                        synchronized (JournalMessageStore.this) {
185                            inFlightTxLocations.remove(location);
186                            removeMessage(ack, location);
187                        }
188                    }
189    
190                    public void afterRollback() throws Exception {
191                        if (debug) {
192                            LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
193                        }
194                        synchronized (JournalMessageStore.this) {
195                            inFlightTxLocations.remove(location);
196                        }
197                    }
198                });
199    
200            }
201        }
202    
203        final void removeMessage(final MessageAck ack, final RecordLocation location) {
204            synchronized (this) {
205                lastLocation = location;
206                MessageId id = ack.getLastMessageId();
207                Message message = messages.remove(id);
208                if (message == null) {
209                    messageAcks.add(ack);
210                } else {
211                    message.decrementReferenceCount();
212                }
213            }
214        }
215    
216        public void replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
217            try {
218                // Only remove the message if it has not already been removed.
219                Message t = longTermStore.getMessage(messageAck.getLastMessageId());
220                if (t != null) {
221                    longTermStore.removeMessage(context, messageAck);
222                }
223            } catch (Throwable e) {
224                LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId() + "'.  Message may have already been acknowledged. reason: " + e);
225            }
226        }
227    
228        /**
229         * @return
230         * @throws IOException
231         */
232        public RecordLocation checkpoint() throws IOException {
233            return checkpoint(null);
234        }
235    
236        /**
237         * @return
238         * @throws IOException
239         */
240        @SuppressWarnings("unchecked")
241        public RecordLocation checkpoint(final Callback postCheckpointTest) throws IOException {
242    
243            final List<MessageAck> cpRemovedMessageLocations;
244            final List<RecordLocation> cpActiveJournalLocations;
245            final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
246    
247            // swap out the message hash maps..
248            synchronized (this) {
249                cpAddedMessageIds = this.messages;
250                cpRemovedMessageLocations = this.messageAcks;
251    
252                cpActiveJournalLocations = new ArrayList<RecordLocation>(inFlightTxLocations);
253    
254                this.messages = new LinkedHashMap<MessageId, Message>();
255                this.messageAcks = new ArrayList<MessageAck>();
256            }
257    
258            transactionTemplate.run(new Callback() {
259                public void execute() throws Exception {
260    
261                    int size = 0;
262    
263                    PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
264                    ConnectionContext context = transactionTemplate.getContext();
265    
266                    // Checkpoint the added messages.
267                    synchronized (JournalMessageStore.this) {
268                        Iterator<Message> iterator = cpAddedMessageIds.values().iterator();
269                        while (iterator.hasNext()) {
270                            Message message = iterator.next();
271                            try {
272                                longTermStore.addMessage(context, message);
273                            } catch (Throwable e) {
274                                LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
275                            }
276                            size += message.getSize();
277                            message.decrementReferenceCount();
278                            // Commit the batch if it's getting too big
279                            if (size >= maxCheckpointMessageAddSize) {
280                                persitanceAdapter.commitTransaction(context);
281                                persitanceAdapter.beginTransaction(context);
282                                size = 0;
283                            }
284                        }
285                    }
286    
287                    persitanceAdapter.commitTransaction(context);
288                    persitanceAdapter.beginTransaction(context);
289    
290                    // Checkpoint the removed messages.
291                    Iterator<MessageAck> iterator = cpRemovedMessageLocations.iterator();
292                    while (iterator.hasNext()) {
293                        try {
294                            MessageAck ack = iterator.next();
295                            longTermStore.removeMessage(transactionTemplate.getContext(), ack);
296                        } catch (Throwable e) {
297                            LOG.debug("Message could not be removed from long term store: " + e.getMessage(), e);
298                        }
299                    }
300    
301                    if (postCheckpointTest != null) {
302                        postCheckpointTest.execute();
303                    }
304                }
305    
306            });
307    
308            synchronized (this) {
309                cpAddedMessageIds = null;
310            }
311    
312            if (cpActiveJournalLocations.size() > 0) {
313                Collections.sort(cpActiveJournalLocations);
314                return cpActiveJournalLocations.get(0);
315            }
316            synchronized (this) {
317                return lastLocation;
318            }
319        }
320    
321        /**
322         * 
323         */
324        public Message getMessage(MessageId identity) throws IOException {
325            Message answer = null;
326    
327            synchronized (this) {
328                // Do we have a still have it in the journal?
329                answer = messages.get(identity);
330                if (answer == null && cpAddedMessageIds != null) {
331                    answer = cpAddedMessageIds.get(identity);
332                }
333            }
334    
335            if (answer != null) {
336                return answer;
337            }
338    
339            // If all else fails try the long term message store.
340            return longTermStore.getMessage(identity);
341        }
342    
343        /**
344         * Replays the checkpointStore first as those messages are the oldest ones,
345         * then messages are replayed from the transaction log and then the cache is
346         * updated.
347         * 
348         * @param listener
349         * @throws Exception
350         */
351        public void recover(final MessageRecoveryListener listener) throws Exception {
352            peristenceAdapter.checkpoint(true, true);
353            longTermStore.recover(listener);
354        }
355    
356        public void start() throws Exception {
357            if (this.memoryUsage != null) {
358                this.memoryUsage.addUsageListener(peristenceAdapter);
359            }
360            longTermStore.start();
361        }
362    
363        public void stop() throws Exception {
364            longTermStore.stop();
365            if (this.memoryUsage != null) {
366                this.memoryUsage.removeUsageListener(peristenceAdapter);
367            }
368        }
369    
370        /**
371         * @return Returns the longTermStore.
372         */
373        public MessageStore getLongTermMessageStore() {
374            return longTermStore;
375        }
376    
377        /**
378         * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
379         */
380        public void removeAllMessages(ConnectionContext context) throws IOException {
381            peristenceAdapter.checkpoint(true, true);
382            longTermStore.removeAllMessages(context);
383        }
384    
385        public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
386            throw new IOException("The journal does not support message references.");
387        }
388    
389        public String getMessageReference(MessageId identity) throws IOException {
390            throw new IOException("The journal does not support message references.");
391        }
392    
393        /**
394         * @return
395         * @throws IOException
396         * @see org.apache.activemq.store.MessageStore#getMessageCount()
397         */
398        public int getMessageCount() throws IOException {
399            peristenceAdapter.checkpoint(true, true);
400            return longTermStore.getMessageCount();
401        }
402    
403        public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
404            peristenceAdapter.checkpoint(true, true);
405            longTermStore.recoverNextMessages(maxReturned, listener);
406    
407        }
408    
409        public void resetBatching() {
410            longTermStore.resetBatching();
411    
412        }
413    
414        @Override
415        public void setBatch(MessageId messageId) throws Exception {
416            peristenceAdapter.checkpoint(true, true);
417            longTermStore.setBatch(messageId);
418        }
419    
420    }