001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.amq;
018    
019    import java.io.IOException;
020    import java.io.InterruptedIOException;
021    import java.util.ArrayList;
022    import java.util.Collections;
023    import java.util.HashSet;
024    import java.util.Iterator;
025    import java.util.LinkedHashMap;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Set;
029    import java.util.Map.Entry;
030    import java.util.concurrent.CountDownLatch;
031    import java.util.concurrent.atomic.AtomicReference;
032    import java.util.concurrent.locks.Lock;
033    import org.apache.activemq.broker.ConnectionContext;
034    import org.apache.activemq.command.ActiveMQDestination;
035    import org.apache.activemq.command.DataStructure;
036    import org.apache.activemq.command.JournalQueueAck;
037    import org.apache.activemq.command.Message;
038    import org.apache.activemq.command.MessageAck;
039    import org.apache.activemq.command.MessageId;
040    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
041    import org.apache.activemq.kaha.MessageAckWithLocation;
042    import org.apache.activemq.kaha.impl.async.Location;
043    import org.apache.activemq.store.AbstractMessageStore;
044    import org.apache.activemq.store.MessageRecoveryListener;
045    import org.apache.activemq.store.PersistenceAdapter;
046    import org.apache.activemq.store.ReferenceStore;
047    import org.apache.activemq.store.ReferenceStore.ReferenceData;
048    import org.apache.activemq.thread.Task;
049    import org.apache.activemq.thread.TaskRunner;
050    import org.apache.activemq.transaction.Synchronization;
051    import org.apache.activemq.usage.MemoryUsage;
052    import org.apache.activemq.util.Callback;
053    import org.apache.activemq.util.TransactionTemplate;
054    import org.slf4j.Logger;
055    import org.slf4j.LoggerFactory;
056    
057    /**
058     * A MessageStore that uses a Journal to store it's messages.
059     * 
060     * 
061     */
062    public class AMQMessageStore extends AbstractMessageStore {
063        private static final Logger LOG = LoggerFactory.getLogger(AMQMessageStore.class);
064        protected final AMQPersistenceAdapter peristenceAdapter;
065        protected final AMQTransactionStore transactionStore;
066        protected final ReferenceStore referenceStore;
067        protected final TransactionTemplate transactionTemplate;
068        protected Location lastLocation;
069        protected Location lastWrittenLocation;
070        protected Set<Location> inFlightTxLocations = new HashSet<Location>();
071        protected final TaskRunner asyncWriteTask;
072        protected CountDownLatch flushLatch;
073        private Map<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
074        private List<MessageAckWithLocation> messageAcks = new ArrayList<MessageAckWithLocation>();
075        /** A MessageStore that we can use to retrieve messages quickly. */
076        private Map<MessageId, ReferenceData> cpAddedMessageIds;
077        private final boolean debug = LOG.isDebugEnabled();
078        private final AtomicReference<Location> mark = new AtomicReference<Location>();
079        protected final Lock lock;
080    
081        public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) {
082            super(destination);
083            this.peristenceAdapter = adapter;
084            this.lock = referenceStore.getStoreLock();
085            this.transactionStore = adapter.getTransactionStore();
086            this.referenceStore = referenceStore;
087            this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(
088                    new NonCachedMessageEvaluationContext()));
089            asyncWriteTask = adapter.getTaskRunnerFactory().createTaskRunner(new Task() {
090                public boolean iterate() {
091                    asyncWrite();
092                    return false;
093                }
094            }, "Checkpoint: " + destination);
095        }
096    
097        public void setMemoryUsage(MemoryUsage memoryUsage) {
098            referenceStore.setMemoryUsage(memoryUsage);
099        }
100    
101        /**
102         * Not synchronize since the Journal has better throughput if you increase the number of concurrent writes that it
103         * is doing.
104         */
105        public final void addMessage(ConnectionContext context, final Message message) throws IOException {
106            final MessageId id = message.getMessageId();
107            final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
108            if (!context.isInTransaction()) {
109                if (debug) {
110                    LOG.debug("Journalled message add for: " + id + ", at: " + location);
111                }
112                this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
113                addMessage(message, location);
114            } else {
115                if (debug) {
116                    LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
117                }
118                lock.lock();
119                try {
120                    inFlightTxLocations.add(location);
121                } finally {
122                    lock.unlock();
123                }
124                transactionStore.addMessage(this, message, location);
125                context.getTransaction().addSynchronization(new Synchronization() {
126                    public void afterCommit() throws Exception {
127                        if (debug) {
128                            LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
129                        }
130                        lock.lock();
131                        try {
132                            inFlightTxLocations.remove(location);
133                        } finally {
134                            lock.unlock();
135                        }
136                        addMessage(message, location);
137                    }
138    
139                    public void afterRollback() throws Exception {
140                        if (debug) {
141                            LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
142                        }
143                        lock.lock();
144                        try {
145                            inFlightTxLocations.remove(location);
146                        } finally {
147                            lock.unlock();
148                        }
149                    }
150                });
151            }
152        }
153    
154        final void addMessage(final Message message, final Location location) throws InterruptedIOException {
155            ReferenceData data = new ReferenceData();
156            data.setExpiration(message.getExpiration());
157            data.setFileId(location.getDataFileId());
158            data.setOffset(location.getOffset());
159            lock.lock();
160            try {
161                lastLocation = location;
162                ReferenceData prev = messages.put(message.getMessageId(), data);
163                if (prev != null) {
164                    AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, prev.getFileId());
165                }
166            } finally {
167                lock.unlock();
168            }
169            if (messages.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
170                flush();
171            } else {
172                try {
173                    asyncWriteTask.wakeup();
174                } catch (InterruptedException e) {
175                    throw new InterruptedIOException();
176                }
177            }
178        }
179    
180        public boolean replayAddMessage(ConnectionContext context, Message message, Location location) {
181            MessageId id = message.getMessageId();
182            try {
183                // Only add the message if it has not already been added.
184                ReferenceData data = referenceStore.getMessageReference(id);
185                if (data == null) {
186                    data = new ReferenceData();
187                    data.setExpiration(message.getExpiration());
188                    data.setFileId(location.getDataFileId());
189                    data.setOffset(location.getOffset());
190                    referenceStore.addMessageReference(context, id, data);
191                    return true;
192                }
193            } catch (Throwable e) {
194                LOG.warn("Could not replay add for message '" + id + "'.  Message may have already been added. reason: "
195                        + e, e);
196            }
197            return false;
198        }
199    
200        /**
201         */
202        public void removeMessage(final ConnectionContext context, final MessageAck ack) throws IOException {
203            JournalQueueAck remove = new JournalQueueAck();
204            remove.setDestination(destination);
205            remove.setMessageAck(ack);
206            final Location location = peristenceAdapter.writeCommand(remove, ack.isResponseRequired());
207            if (!context.isInTransaction()) {
208                if (debug) {
209                    LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
210                }
211                removeMessage(ack, location);
212            } else {
213                if (debug) {
214                    LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
215                }
216                lock.lock();
217                try {
218                    inFlightTxLocations.add(location);
219                } finally {
220                    lock.unlock();
221                }
222                transactionStore.removeMessage(this, ack, location);
223                context.getTransaction().addSynchronization(new Synchronization() {
224                    public void afterCommit() throws Exception {
225                        if (debug) {
226                            LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: "
227                                    + location);
228                        }
229                        lock.lock();
230                        try {
231                            inFlightTxLocations.remove(location);
232                        } finally {
233                            lock.unlock();
234                        }
235                        removeMessage(ack, location);
236                    }
237    
238                    public void afterRollback() throws Exception {
239                        if (debug) {
240                            LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: "
241                                    + location);
242                        }
243                        lock.lock();
244                        try {
245                            inFlightTxLocations.remove(location);
246                        } finally {
247                            lock.unlock();
248                        }
249                    }
250                });
251            }
252        }
253    
254        final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException {
255            ReferenceData data;
256            lock.lock();
257            try {
258                lastLocation = location;
259                MessageId id = ack.getLastMessageId();
260                data = messages.remove(id);
261                if (data == null) {
262                    messageAcks.add(new MessageAckWithLocation(ack, location));
263                } else {
264                    // message never got written so datafileReference will still exist
265                    AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId());
266                }
267            } finally {
268                lock.unlock();
269            }
270            if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
271                flush();
272            } else if (data == null) {
273                try {
274                    asyncWriteTask.wakeup();
275                } catch (InterruptedException e) {
276                    throw new InterruptedIOException();
277                }
278            }
279        }
280    
281        public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
282            try {
283                // Only remove the message if it has not already been removed.
284                ReferenceData t = referenceStore.getMessageReference(messageAck.getLastMessageId());
285                if (t != null) {
286                    referenceStore.removeMessage(context, messageAck);
287                    return true;
288                }
289            } catch (Throwable e) {
290                LOG.warn("Could not replay acknowledge for message '" + messageAck.getLastMessageId()
291                        + "'.  Message may have already been acknowledged. reason: " + e);
292            }
293            return false;
294        }
295    
296        /**
297         * Waits till the lastest data has landed on the referenceStore
298         * 
299         * @throws InterruptedIOException
300         */
301        public void flush() throws InterruptedIOException {
302            if (LOG.isDebugEnabled()) {
303                LOG.debug("flush starting ...");
304            }
305            CountDownLatch countDown;
306            lock.lock();
307            try {
308                if (lastWrittenLocation == lastLocation) {
309                    return;
310                }
311                if (flushLatch == null) {
312                    flushLatch = new CountDownLatch(1);
313                }
314                countDown = flushLatch;
315            } finally {
316                lock.unlock();
317            }
318            try {
319                asyncWriteTask.wakeup();
320                countDown.await();
321            } catch (InterruptedException e) {
322                throw new InterruptedIOException();
323            }
324            if (LOG.isDebugEnabled()) {
325                LOG.debug("flush finished");
326            }
327        }
328    
329        /**
330         * @return
331         * @throws IOException
332         */
333        synchronized void asyncWrite() {
334            try {
335                CountDownLatch countDown;
336                lock.lock();
337                try {
338                    countDown = flushLatch;
339                    flushLatch = null;
340                } finally {
341                    lock.unlock();
342                }
343                mark.set(doAsyncWrite());
344                if (countDown != null) {
345                    countDown.countDown();
346                }
347            } catch (IOException e) {
348                LOG.error("Checkpoint failed: " + e, e);
349            }
350        }
351    
352        /**
353         * @return
354         * @throws IOException
355         */
356        protected Location doAsyncWrite() throws IOException {
357            final List<MessageAckWithLocation> cpRemovedMessageLocations;
358            final List<Location> cpActiveJournalLocations;
359            final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
360            final Location lastLocation;
361            // swap out the message hash maps..
362            lock.lock();
363            try {
364                cpAddedMessageIds = this.messages;
365                cpRemovedMessageLocations = this.messageAcks;
366                cpActiveJournalLocations = new ArrayList<Location>(inFlightTxLocations);
367                this.messages = new LinkedHashMap<MessageId, ReferenceData>();
368                this.messageAcks = new ArrayList<MessageAckWithLocation>();
369                lastLocation = this.lastLocation;
370            } finally {
371                lock.unlock();
372            }
373            if (LOG.isDebugEnabled()) {
374                LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: "
375                        + cpRemovedMessageLocations.size() + " ");
376            }
377            transactionTemplate.run(new Callback() {
378                public void execute() throws Exception {
379                    int size = 0;
380                    PersistenceAdapter persitanceAdapter = transactionTemplate.getPersistenceAdapter();
381                    ConnectionContext context = transactionTemplate.getContext();
382                    // Checkpoint the added messages.
383                    Iterator<Entry<MessageId, ReferenceData>> iterator = cpAddedMessageIds.entrySet().iterator();
384                    while (iterator.hasNext()) {
385                        Entry<MessageId, ReferenceData> entry = iterator.next();
386                        try {
387                            if (referenceStore.addMessageReference(context, entry.getKey(), entry.getValue())) {
388                                if (LOG.isDebugEnabled()) {
389                                    LOG.debug("adding message ref:" + entry.getKey());
390                                }
391                                size++;
392                            } else {
393                                if (LOG.isDebugEnabled()) {
394                                    LOG.debug("not adding duplicate reference: " + entry.getKey() + ", " + entry.getValue());
395                                }
396                            }
397                            AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, entry
398                                    .getValue().getFileId());
399                        } catch (Throwable e) {
400                            LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
401                        }
402                        
403                        // Commit the batch if it's getting too big
404                        if (size >= maxCheckpointMessageAddSize) {
405                            persitanceAdapter.commitTransaction(context);
406                            persitanceAdapter.beginTransaction(context);
407                            size = 0;
408                        }
409                    }
410                    persitanceAdapter.commitTransaction(context);
411                    persitanceAdapter.beginTransaction(context);
412                    // Checkpoint the removed messages.
413                    for (MessageAckWithLocation ack : cpRemovedMessageLocations) {
414                        try {
415                            referenceStore.removeMessage(transactionTemplate.getContext(), ack);
416                        } catch (Throwable e) {
417                            LOG.warn("Message could not be removed from long term store: " + e.getMessage(), e);
418                        }
419                    }
420                }
421            });
422            LOG.debug("Batch update done. lastLocation:" + lastLocation);
423            lock.lock();
424            try {
425                cpAddedMessageIds = null;
426                lastWrittenLocation = lastLocation;
427            } finally {
428                lock.unlock();
429            }
430            if (cpActiveJournalLocations.size() > 0) {
431                Collections.sort(cpActiveJournalLocations);
432                return cpActiveJournalLocations.get(0);
433            } else {
434                return lastLocation;
435            }
436        }
437    
438        /**
439         * 
440         */
441        public Message getMessage(MessageId identity) throws IOException {
442            Location location = getLocation(identity);
443            if (location != null) {
444                DataStructure rc = peristenceAdapter.readCommand(location);
445                try {
446                    return (Message) rc;
447                } catch (ClassCastException e) {
448                    throw new IOException("Could not read message " + identity + " at location " + location
449                            + ", expected a message, but got: " + rc);
450                }
451            }
452            return null;
453        }
454    
455        protected Location getLocation(MessageId messageId) throws IOException {
456            ReferenceData data = null;
457            lock.lock();
458            try {
459                // Is it still in flight???
460                data = messages.get(messageId);
461                if (data == null && cpAddedMessageIds != null) {
462                    data = cpAddedMessageIds.get(messageId);
463                }
464            } finally {
465                lock.unlock();
466            }
467            if (data == null) {
468                data = referenceStore.getMessageReference(messageId);
469                if (data == null) {
470                    return null;
471                }
472            }
473            Location location = new Location();
474            location.setDataFileId(data.getFileId());
475            location.setOffset(data.getOffset());
476            return location;
477        }
478    
479        /**
480         * Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the
481         * transaction log and then the cache is updated.
482         * 
483         * @param listener
484         * @throws Exception
485         */
486        public void recover(final MessageRecoveryListener listener) throws Exception {
487            flush();
488            referenceStore.recover(new RecoveryListenerAdapter(this, listener));
489        }
490    
491        public void start() throws Exception {
492            referenceStore.start();
493        }
494    
495        public void stop() throws Exception {
496            flush();
497            asyncWriteTask.shutdown();
498            referenceStore.stop();
499        }
500    
501        /**
502         * @return Returns the longTermStore.
503         */
504        public ReferenceStore getReferenceStore() {
505            return referenceStore;
506        }
507    
508        /**
509         * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
510         */
511        public void removeAllMessages(ConnectionContext context) throws IOException {
512            flush();
513            referenceStore.removeAllMessages(context);
514        }
515    
516        public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime,
517                String messageRef) throws IOException {
518            throw new IOException("The journal does not support message references.");
519        }
520    
521        public String getMessageReference(MessageId identity) throws IOException {
522            throw new IOException("The journal does not support message references.");
523        }
524    
525        /**
526         * @return
527         * @throws IOException
528         * @see org.apache.activemq.store.MessageStore#getMessageCount()
529         */
530        public int getMessageCount() throws IOException {
531            flush();
532            return referenceStore.getMessageCount();
533        }
534    
535        public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
536            RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
537            referenceStore.recoverNextMessages(maxReturned, recoveryListener);
538            if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
539                flush();
540                referenceStore.recoverNextMessages(maxReturned, recoveryListener);
541            }
542        }
543    
544        Message getMessage(ReferenceData data) throws IOException {
545            Location location = new Location();
546            location.setDataFileId(data.getFileId());
547            location.setOffset(data.getOffset());
548            DataStructure rc = peristenceAdapter.readCommand(location);
549            try {
550                return (Message) rc;
551            } catch (ClassCastException e) {
552                throw new IOException("Could not read message  at location " + location + ", expected a message, but got: "
553                        + rc);
554            }
555        }
556    
557        public void resetBatching() {
558            referenceStore.resetBatching();
559        }
560    
561        public Location getMark() {
562            return mark.get();
563        }
564    
565        public void dispose(ConnectionContext context) {
566            try {
567                flush();
568            } catch (InterruptedIOException e) {
569                Thread.currentThread().interrupt();
570            }
571            referenceStore.dispose(context);
572            super.dispose(context);
573        }
574    
575        public void setBatch(MessageId messageId) {
576            try {
577                flush();
578            } catch (InterruptedIOException e) {
579                LOG.debug("flush on setBatch resulted in exception", e);
580            }
581            getReferenceStore().setBatch(messageId);
582        }
583    
584    }