001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.amq;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.io.RandomAccessFile;
022    import java.nio.channels.FileLock;
023    import java.util.Date;
024    import java.util.HashSet;
025    import java.util.Iterator;
026    import java.util.Map;
027    import java.util.Set;
028    import java.util.concurrent.ConcurrentHashMap;
029    import java.util.concurrent.CountDownLatch;
030    import java.util.concurrent.atomic.AtomicBoolean;
031    import java.util.concurrent.atomic.AtomicInteger;
032    import java.util.concurrent.atomic.AtomicLong;
033    
034    import org.apache.activeio.journal.Journal;
035    import org.apache.activemq.broker.BrokerService;
036    import org.apache.activemq.broker.BrokerServiceAware;
037    import org.apache.activemq.broker.ConnectionContext;
038    import org.apache.activemq.command.ActiveMQDestination;
039    import org.apache.activemq.command.ActiveMQQueue;
040    import org.apache.activemq.command.ActiveMQTopic;
041    import org.apache.activemq.command.DataStructure;
042    import org.apache.activemq.command.JournalQueueAck;
043    import org.apache.activemq.command.JournalTopicAck;
044    import org.apache.activemq.command.JournalTrace;
045    import org.apache.activemq.command.JournalTransaction;
046    import org.apache.activemq.command.Message;
047    import org.apache.activemq.command.ProducerId;
048    import org.apache.activemq.command.SubscriptionInfo;
049    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
050    import org.apache.activemq.kaha.impl.async.AsyncDataManager;
051    import org.apache.activemq.kaha.impl.async.Location;
052    import org.apache.activemq.kaha.impl.index.hash.HashIndex;
053    import org.apache.activemq.openwire.OpenWireFormat;
054    import org.apache.activemq.store.JournaledStore;
055    import org.apache.activemq.store.MessageStore;
056    import org.apache.activemq.store.PersistenceAdapter;
057    import org.apache.activemq.store.ReferenceStore;
058    import org.apache.activemq.store.ReferenceStoreAdapter;
059    import org.apache.activemq.store.TopicMessageStore;
060    import org.apache.activemq.store.TopicReferenceStore;
061    import org.apache.activemq.store.TransactionStore;
062    import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
063    import org.apache.activemq.thread.Scheduler;
064    import org.apache.activemq.thread.Task;
065    import org.apache.activemq.thread.TaskRunner;
066    import org.apache.activemq.thread.TaskRunnerFactory;
067    import org.apache.activemq.usage.SystemUsage;
068    import org.apache.activemq.usage.Usage;
069    import org.apache.activemq.usage.UsageListener;
070    import org.apache.activemq.util.ByteSequence;
071    import org.apache.activemq.util.IOExceptionSupport;
072    import org.apache.activemq.util.IOHelper;
073    import org.apache.activemq.wireformat.WireFormat;
074    import org.slf4j.Logger;
075    import org.slf4j.LoggerFactory;
076    
077    
078    /**
079     * An implementation of {@link PersistenceAdapter} designed for use with a
080     * {@link Journal} and then check pointing asynchronously on a timeout with some
081     * other long term persistent storage.
082     *
083     * @org.apache.xbean.XBean element="amqPersistenceAdapter"
084     *
085     */
086    @Deprecated
087    public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware, JournaledStore {
088    
089        private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class);
090        private Scheduler scheduler;
091        private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
092        private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
093        private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
094        private static final boolean BROKEN_FILE_LOCK;
095        private static final boolean DISABLE_LOCKING;
096        private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
097        private AsyncDataManager asyncDataManager;
098        private ReferenceStoreAdapter referenceStoreAdapter;
099        private TaskRunnerFactory taskRunnerFactory;
100        private WireFormat wireFormat = new OpenWireFormat();
101        private SystemUsage usageManager;
102        private long checkpointInterval = 1000 * 20;
103        private int maxCheckpointMessageAddSize = 1024 * 4;
104        private final AMQTransactionStore transactionStore = new AMQTransactionStore(this);
105        private TaskRunner checkpointTask;
106        private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
107        private final AtomicBoolean started = new AtomicBoolean(false);
108        private Runnable periodicCheckpointTask;
109        private Runnable periodicCleanupTask;
110        private boolean deleteAllMessages;
111        private boolean syncOnWrite;
112        private String brokerName = "";
113        private File directory;
114        private File directoryArchive;
115        private BrokerService brokerService;
116        private final AtomicLong storeSize = new AtomicLong();
117        private boolean persistentIndex=true;
118        private boolean useNio = true;
119        private boolean archiveDataLogs=false;
120        private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
121        private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
122        private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
123        private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
124        private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
125        private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
126        private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
127        private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
128        private final Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> ();
129        private RandomAccessFile lockFile;
130        private FileLock lock;
131        private boolean disableLocking = DISABLE_LOCKING;
132        private boolean failIfJournalIsLocked;
133        private boolean lockLogged;
134        private boolean lockAquired;
135        private boolean recoverReferenceStore=true;
136        private boolean forceRecoverReferenceStore=false;
137        private boolean useDedicatedTaskRunner=false;
138        private int journalThreadPriority = Thread.MAX_PRIORITY;
139    
140        public String getBrokerName() {
141            return this.brokerName;
142        }
143    
144        @Override
145        public void setBrokerName(String brokerName) {
146            this.brokerName = brokerName;
147            if (this.referenceStoreAdapter != null) {
148                this.referenceStoreAdapter.setBrokerName(brokerName);
149            }
150        }
151    
152        public BrokerService getBrokerService() {
153            return brokerService;
154        }
155    
156        @Override
157        public void setBrokerService(BrokerService brokerService) {
158            this.brokerService = brokerService;
159        }
160    
161        @Override
162        public synchronized void start() throws Exception {
163            if (!started.compareAndSet(false, true)) {
164                return;
165            }
166            if (this.directory == null) {
167                if (brokerService != null) {
168                    this.directory = brokerService.getBrokerDataDirectory();
169    
170                } else {
171                    this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
172                    this.directory = new File(directory, "amqstore");
173                    directory.getAbsolutePath();
174                }
175            }
176            if (this.directoryArchive == null) {
177                this.directoryArchive = new File(this.directory,"archive");
178            }
179            if (this.brokerService != null) {
180                this.taskRunnerFactory = this.brokerService.getTaskRunnerFactory();
181                this.scheduler = this.brokerService.getScheduler();
182            } else {
183                this.taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", getJournalThreadPriority(),
184                    true, 1000, isUseDedicatedTaskRunner());
185                this.scheduler = new Scheduler("AMQPersistenceAdapter Scheduler");
186            }
187    
188            IOHelper.mkdirs(this.directory);
189            lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
190            lock();
191            LOG.info("AMQStore starting using directory: " + directory);
192            if (archiveDataLogs) {
193                IOHelper.mkdirs(this.directoryArchive);
194            }
195    
196            if (this.usageManager != null) {
197                this.usageManager.getMemoryUsage().addUsageListener(this);
198            }
199            if (asyncDataManager == null) {
200                asyncDataManager = createAsyncDataManager();
201            }
202            if (referenceStoreAdapter == null) {
203                referenceStoreAdapter = createReferenceStoreAdapter();
204            }
205            referenceStoreAdapter.setDirectory(new File(directory, "kr-store"));
206            referenceStoreAdapter.setBrokerName(getBrokerName());
207            referenceStoreAdapter.setUsageManager(usageManager);
208            referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength());
209    
210            if (failIfJournalIsLocked) {
211                asyncDataManager.lock();
212            } else {
213                while (true) {
214                    try {
215                        asyncDataManager.lock();
216                        break;
217                    } catch (IOException e) {
218                        LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e);
219                        try {
220                            Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
221                        } catch (InterruptedException e1) {
222                        }
223                    }
224                }
225            }
226    
227            asyncDataManager.start();
228            if (deleteAllMessages) {
229                asyncDataManager.delete();
230                try {
231                    JournalTrace trace = new JournalTrace();
232                    trace.setMessage("DELETED " + new Date());
233                    Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
234                    asyncDataManager.setMark(location, true);
235                    LOG.info("Journal deleted: ");
236                    deleteAllMessages = false;
237                } catch (IOException e) {
238                    throw e;
239                } catch (Throwable e) {
240                    throw IOExceptionSupport.create(e);
241                }
242                referenceStoreAdapter.deleteAllMessages();
243            }
244            referenceStoreAdapter.start();
245            Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
246            LOG.info("Active data files: " + files);
247            checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
248    
249                @Override
250                public boolean iterate() {
251                    doCheckpoint();
252                    return false;
253                }
254            }, "ActiveMQ Journal Checkpoint Worker");
255            createTransactionStore();
256    
257            //
258            // The following was attempting to reduce startup times by avoiding the
259            // log
260            // file scanning that recovery performs. The problem with it is that XA
261            // transactions
262            // only live in transaction log and are not stored in the reference
263            // store, but they still
264            // need to be recovered when the broker starts up.
265    
266            if (isForceRecoverReferenceStore()
267                    || (isRecoverReferenceStore() && !referenceStoreAdapter
268                            .isStoreValid())) {
269                LOG.warn("The ReferenceStore is not valid - recovering ...");
270                recover();
271                LOG.info("Finished recovering the ReferenceStore");
272            } else {
273                Location location = writeTraceMessage("RECOVERED " + new Date(),
274                        true);
275                asyncDataManager.setMark(location, true);
276                // recover transactions
277                getTransactionStore().setPreparedTransactions(
278                        referenceStoreAdapter.retrievePreparedState());
279            }
280    
281            // Do a checkpoint periodically.
282            periodicCheckpointTask = new Runnable() {
283    
284                @Override
285                public void run() {
286                    checkpoint(false);
287                }
288            };
289            scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
290            periodicCleanupTask = new Runnable() {
291    
292                @Override
293                public void run() {
294                    cleanup();
295                }
296            };
297            scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
298    
299            if (lockAquired && lockLogged) {
300                LOG.info("Aquired lock for AMQ Store" + getDirectory());
301                if (brokerService != null) {
302                    brokerService.getBroker().nowMasterBroker();
303                }
304            }
305    
306        }
307    
308        @Override
309        public void stop() throws Exception {
310    
311            if (!started.compareAndSet(true, false)) {
312                return;
313            }
314            unlock();
315            if (lockFile != null) {
316                lockFile.close();
317                lockFile = null;
318            }
319            this.usageManager.getMemoryUsage().removeUsageListener(this);
320            synchronized (this) {
321                scheduler.cancel(periodicCheckpointTask);
322                scheduler.cancel(periodicCleanupTask);
323            }
324            Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
325            while (queueIterator.hasNext()) {
326                AMQMessageStore ms = queueIterator.next();
327                ms.stop();
328            }
329            Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
330            while (topicIterator.hasNext()) {
331                final AMQTopicMessageStore ms = topicIterator.next();
332                ms.stop();
333            }
334            // Take one final checkpoint and stop checkpoint processing.
335            checkpoint(true);
336            synchronized (this) {
337                checkpointTask.shutdown();
338            }
339            referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions());
340            queues.clear();
341            topics.clear();
342            IOException firstException = null;
343            referenceStoreAdapter.stop();
344            referenceStoreAdapter = null;
345    
346            if (this.brokerService == null) {
347                this.taskRunnerFactory.shutdown();
348                this.scheduler.stop();
349            }
350            try {
351                LOG.debug("Journal close");
352                asyncDataManager.close();
353            } catch (Exception e) {
354                firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
355            }
356            if (firstException != null) {
357                throw firstException;
358            }
359        }
360    
361        /**
362         * When we checkpoint we move all the journalled data to long term storage.
363         *
364         * @param sync
365         */
366        @Override
367        public void checkpoint(boolean sync) {
368            try {
369                if (asyncDataManager == null) {
370                    throw new IllegalStateException("Journal is closed.");
371                }
372                CountDownLatch latch = null;
373                synchronized (this) {
374                    latch = nextCheckpointCountDownLatch;
375                    checkpointTask.wakeup();
376                }
377                if (sync) {
378                    if (LOG.isDebugEnabled()) {
379                        LOG.debug("Waitng for checkpoint to complete.");
380                    }
381                    latch.await();
382                }
383                referenceStoreAdapter.checkpoint(sync);
384            } catch (InterruptedException e) {
385                Thread.currentThread().interrupt();
386                LOG.warn("Request to start checkpoint failed: " + e, e);
387            } catch (IOException e) {
388                LOG.error("checkpoint failed: " + e, e);
389            }
390        }
391    
392        /**
393         * This does the actual checkpoint.
394         *
395         * @return true if successful
396         */
397        public boolean doCheckpoint() {
398            CountDownLatch latch = null;
399            synchronized (this) {
400                latch = nextCheckpointCountDownLatch;
401                nextCheckpointCountDownLatch = new CountDownLatch(1);
402            }
403            try {
404                if (LOG.isDebugEnabled()) {
405                    LOG.debug("Checkpoint started.");
406                }
407    
408                Location currentMark = asyncDataManager.getMark();
409                Location newMark = currentMark;
410                Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
411                while (queueIterator.hasNext()) {
412                    final AMQMessageStore ms = queueIterator.next();
413                    Location mark = ms.getMark();
414                    if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
415                        newMark = mark;
416                    }
417                }
418                Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
419                while (topicIterator.hasNext()) {
420                    final AMQTopicMessageStore ms = topicIterator.next();
421                    Location mark = ms.getMark();
422                    if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
423                        newMark = mark;
424                    }
425                }
426                try {
427                    if (newMark != currentMark) {
428                        if (LOG.isDebugEnabled()) {
429                            LOG.debug("Marking journal at: " + newMark);
430                        }
431                        asyncDataManager.setMark(newMark, false);
432                        writeTraceMessage("CHECKPOINT " + new Date(), true);
433                    }
434                } catch (Exception e) {
435                    LOG.error("Failed to mark the Journal: " + e, e);
436                }
437                if (LOG.isDebugEnabled()) {
438                    LOG.debug("Checkpoint done.");
439                }
440            } finally {
441                latch.countDown();
442            }
443            return true;
444        }
445    
446        /**
447         * Cleans up the data files
448         * @throws IOException
449         */
450        public void cleanup() {
451            try {
452                Set<Integer>inProgress = new HashSet<Integer>();
453                if (LOG.isDebugEnabled()) {
454                    LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values());
455                }
456                for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) {
457                    inProgress.addAll(set.keySet());
458                }
459                Integer lastDataFile = asyncDataManager.getCurrentDataFileId();
460                inProgress.add(lastDataFile);
461                lastDataFile = asyncDataManager.getMark().getDataFileId();
462                inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse());
463                Location lastActiveTx = transactionStore.checkpoint();
464                if (lastActiveTx != null) {
465                    lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId());
466                }
467                LOG.debug("lastDataFile: " + lastDataFile);
468                asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1);
469            } catch (IOException e) {
470                LOG.error("Could not cleanup data files: " + e, e);
471            }
472        }
473    
474        @Override
475        public Set<ActiveMQDestination> getDestinations() {
476            Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
477            destinations.addAll(queues.keySet());
478            destinations.addAll(topics.keySet());
479            return destinations;
480        }
481    
482        MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
483            if (destination.isQueue()) {
484                return createQueueMessageStore((ActiveMQQueue)destination);
485            } else {
486                return createTopicMessageStore((ActiveMQTopic)destination);
487            }
488        }
489    
490        @Override
491        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
492            AMQMessageStore store = queues.get(destination);
493            if (store == null) {
494                ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
495                store = new AMQMessageStore(this, checkpointStore, destination);
496                try {
497                    store.start();
498                } catch (Exception e) {
499                    throw IOExceptionSupport.create(e);
500                }
501                queues.put(destination, store);
502            }
503            return store;
504        }
505    
506        @Override
507        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
508            AMQTopicMessageStore store = topics.get(destinationName);
509            if (store == null) {
510                TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
511                store = new AMQTopicMessageStore(this,checkpointStore, destinationName);
512                try {
513                    store.start();
514                } catch (Exception e) {
515                    throw IOExceptionSupport.create(e);
516                }
517                topics.put(destinationName, store);
518            }
519            return store;
520        }
521    
522        /**
523         * Cleanup method to remove any state associated with the given destination
524         *
525         * @param destination
526         */
527        @Override
528        public void removeQueueMessageStore(ActiveMQQueue destination) {
529            AMQMessageStore store= queues.remove(destination);
530            referenceStoreAdapter.removeQueueMessageStore(destination);
531        }
532    
533        /**
534         * Cleanup method to remove any state associated with the given destination
535         *
536         * @param destination
537         */
538        @Override
539        public void removeTopicMessageStore(ActiveMQTopic destination) {
540            topics.remove(destination);
541        }
542    
543        @Override
544        public TransactionStore createTransactionStore() throws IOException {
545            return transactionStore;
546        }
547    
548        @Override
549        public long getLastMessageBrokerSequenceId() throws IOException {
550            return referenceStoreAdapter.getLastMessageBrokerSequenceId();
551        }
552    
553        @Override
554        public void beginTransaction(ConnectionContext context) throws IOException {
555            referenceStoreAdapter.beginTransaction(context);
556        }
557    
558        @Override
559        public void commitTransaction(ConnectionContext context) throws IOException {
560            referenceStoreAdapter.commitTransaction(context);
561        }
562    
563        @Override
564        public void rollbackTransaction(ConnectionContext context) throws IOException {
565            referenceStoreAdapter.rollbackTransaction(context);
566        }
567    
568        public boolean isPersistentIndex() {
569            return persistentIndex;
570        }
571    
572        public void setPersistentIndex(boolean persistentIndex) {
573            this.persistentIndex = persistentIndex;
574        }
575    
576        /**
577         * @param location
578         * @return
579         * @throws IOException
580         */
581        public DataStructure readCommand(Location location) throws IOException {
582            try {
583                ByteSequence packet = asyncDataManager.read(location);
584                return (DataStructure)wireFormat.unmarshal(packet);
585            } catch (IOException e) {
586                throw createReadException(location, e);
587            }
588        }
589    
590        /**
591         * Move all the messages that were in the journal into long term storage. We
592         * just replay and do a checkpoint.
593         *
594         * @throws IOException
595         * @throws IOException
596         * @throws IllegalStateException
597         */
598        private void recover() throws IllegalStateException, IOException {
599            referenceStoreAdapter.clearMessages();
600            Location pos = null;
601            int redoCounter = 0;
602            LOG.info("Journal Recovery Started from: " + asyncDataManager);
603            long start = System.currentTimeMillis();
604            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
605            // While we have records in the journal.
606            while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
607                ByteSequence data = asyncDataManager.read(pos);
608                DataStructure c = (DataStructure)wireFormat.unmarshal(data);
609                if (c instanceof Message) {
610                    Message message = (Message)c;
611                    AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination());
612                    if (message.isInTransaction()) {
613                        transactionStore.addMessage(store, message, pos);
614                    } else {
615                        if (store.replayAddMessage(context, message, pos)) {
616                            redoCounter++;
617                        }
618                    }
619                } else {
620                    switch (c.getDataStructureType()) {
621                    case SubscriptionInfo.DATA_STRUCTURE_TYPE: {
622                        referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c);
623                    }
624                        break;
625                    case JournalQueueAck.DATA_STRUCTURE_TYPE: {
626                        JournalQueueAck command = (JournalQueueAck)c;
627                        AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination());
628                        if (command.getMessageAck().isInTransaction()) {
629                            transactionStore.removeMessage(store, command.getMessageAck(), pos);
630                        } else {
631                            if (store.replayRemoveMessage(context, command.getMessageAck())) {
632                                redoCounter++;
633                            }
634                        }
635                    }
636                        break;
637                    case JournalTopicAck.DATA_STRUCTURE_TYPE: {
638                        JournalTopicAck command = (JournalTopicAck)c;
639                        AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination());
640                        if (command.getTransactionId() != null) {
641                            transactionStore.acknowledge(store, command, pos);
642                        } else {
643                            if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) {
644                                redoCounter++;
645                            }
646                        }
647                    }
648                        break;
649                    case JournalTransaction.DATA_STRUCTURE_TYPE: {
650                        JournalTransaction command = (JournalTransaction)c;
651                        try {
652                            // Try to replay the packet.
653                            switch (command.getType()) {
654                            case JournalTransaction.XA_PREPARE:
655                                transactionStore.replayPrepare(command.getTransactionId());
656                                break;
657                            case JournalTransaction.XA_COMMIT:
658                            case JournalTransaction.LOCAL_COMMIT:
659                                AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
660                                if (tx == null) {
661                                    break; // We may be trying to replay a commit
662                                }
663                                // that
664                                // was already committed.
665                                // Replay the committed operations.
666                                tx.getOperations();
667                                for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
668                                    AMQTxOperation op = (AMQTxOperation)iter.next();
669                                    if (op.replay(this, context)) {
670                                        redoCounter++;
671                                    }
672                                }
673                                break;
674                            case JournalTransaction.LOCAL_ROLLBACK:
675                            case JournalTransaction.XA_ROLLBACK:
676                                transactionStore.replayRollback(command.getTransactionId());
677                                break;
678                            default:
679                                throw new IOException("Invalid journal command type: " + command.getType());
680                            }
681                        } catch (IOException e) {
682                            LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
683                        }
684                    }
685                        break;
686                    case JournalTrace.DATA_STRUCTURE_TYPE:
687                        JournalTrace trace = (JournalTrace)c;
688                        LOG.debug("TRACE Entry: " + trace.getMessage());
689                        break;
690                    default:
691                        LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
692                    }
693                }
694            }
695            Location location = writeTraceMessage("RECOVERED " + new Date(), true);
696            asyncDataManager.setMark(location, true);
697            long end = System.currentTimeMillis();
698            LOG.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
699        }
700    
701        private IOException createReadException(Location location, Exception e) {
702            return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
703        }
704    
705        protected IOException createWriteException(DataStructure packet, Exception e) {
706            return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
707        }
708    
709        protected IOException createWriteException(String command, Exception e) {
710            return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
711        }
712    
713        protected IOException createRecoveryFailedException(Exception e) {
714            return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
715        }
716    
717        /**
718         * @param command
719         * @param syncHint
720         * @return
721         * @throws IOException
722         */
723        public Location writeCommand(DataStructure command, boolean syncHint) throws IOException {
724            return writeCommand(command, syncHint,false);
725        }
726    
727        public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException {
728            try {
729                return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
730            } catch (IOException ioe) {
731                LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
732                brokerService.handleIOException(ioe);
733                throw ioe;
734            }
735        }
736    
737        private Location writeTraceMessage(String message, boolean sync) throws IOException {
738            JournalTrace trace = new JournalTrace();
739            trace.setMessage(message);
740            return writeCommand(trace, sync);
741        }
742    
743        @Override
744        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
745            newPercentUsage = (newPercentUsage / 10) * 10;
746            oldPercentUsage = (oldPercentUsage / 10) * 10;
747            if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
748                checkpoint(false);
749            }
750        }
751    
752        public AMQTransactionStore getTransactionStore() {
753            return transactionStore;
754        }
755    
756        @Override
757        public synchronized void deleteAllMessages() throws IOException {
758            deleteAllMessages = true;
759        }
760    
761        @Override
762        public String toString() {
763            return "AMQPersistenceAdapter(" + directory + ")";
764        }
765    
766        // /////////////////////////////////////////////////////////////////
767        // Subclass overridables
768        // /////////////////////////////////////////////////////////////////
769        protected AsyncDataManager createAsyncDataManager() {
770            AsyncDataManager manager = new AsyncDataManager(storeSize);
771            manager.setDirectory(new File(directory, "journal"));
772            manager.setDirectoryArchive(getDirectoryArchive());
773            manager.setArchiveDataLogs(isArchiveDataLogs());
774            manager.setMaxFileLength(maxFileLength);
775            manager.setUseNio(useNio);
776            return manager;
777        }
778    
779        protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
780            KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
781            adaptor.setPersistentIndex(isPersistentIndex());
782            adaptor.setIndexBinSize(getIndexBinSize());
783            adaptor.setIndexKeySize(getIndexKeySize());
784            adaptor.setIndexPageSize(getIndexPageSize());
785            adaptor.setIndexMaxBinSize(getIndexMaxBinSize());
786            adaptor.setIndexLoadFactor(getIndexLoadFactor());
787            return adaptor;
788        }
789    
790        // /////////////////////////////////////////////////////////////////
791        // Property Accessors
792        // /////////////////////////////////////////////////////////////////
793        public AsyncDataManager getAsyncDataManager() {
794            return asyncDataManager;
795        }
796    
797        public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
798            this.asyncDataManager = asyncDataManager;
799        }
800    
801        public ReferenceStoreAdapter getReferenceStoreAdapter() {
802            return referenceStoreAdapter;
803        }
804    
805        public TaskRunnerFactory getTaskRunnerFactory() {
806            return taskRunnerFactory;
807        }
808    
809        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
810            this.taskRunnerFactory = taskRunnerFactory;
811        }
812    
813        /**
814         * @return Returns the wireFormat.
815         */
816        public WireFormat getWireFormat() {
817            return wireFormat;
818        }
819    
820        public void setWireFormat(WireFormat wireFormat) {
821            this.wireFormat = wireFormat;
822        }
823    
824        public SystemUsage getUsageManager() {
825            return usageManager;
826        }
827    
828        @Override
829        public void setUsageManager(SystemUsage usageManager) {
830            this.usageManager = usageManager;
831        }
832    
833        public int getMaxCheckpointMessageAddSize() {
834            return maxCheckpointMessageAddSize;
835        }
836    
837        /**
838         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
839         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
840         */
841        public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
842            this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
843        }
844    
845    
846        @Override
847        public synchronized File getDirectory() {
848            return directory;
849        }
850    
851        @Override
852        public synchronized void setDirectory(File directory) {
853            this.directory = directory;
854        }
855    
856        public boolean isSyncOnWrite() {
857            return this.syncOnWrite;
858        }
859    
860        public void setSyncOnWrite(boolean syncOnWrite) {
861            this.syncOnWrite = syncOnWrite;
862        }
863    
864        /**
865         * @param referenceStoreAdapter the referenceStoreAdapter to set
866         */
867        public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
868            this.referenceStoreAdapter = referenceStoreAdapter;
869        }
870    
871        @Override
872        public long size(){
873            return storeSize.get();
874        }
875    
876        public boolean isUseNio() {
877            return useNio;
878        }
879    
880        public void setUseNio(boolean useNio) {
881            this.useNio = useNio;
882        }
883    
884        public int getMaxFileLength() {
885            return maxFileLength;
886        }
887    
888         /**
889          * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
890          * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
891          */
892        public void setMaxFileLength(int maxFileLength) {
893            this.maxFileLength = maxFileLength;
894        }
895    
896        public long getCleanupInterval() {
897            return cleanupInterval;
898        }
899    
900        public void setCleanupInterval(long cleanupInterval) {
901            this.cleanupInterval = cleanupInterval;
902        }
903    
904        public long getCheckpointInterval() {
905            return checkpointInterval;
906        }
907    
908        public void setCheckpointInterval(long checkpointInterval) {
909            this.checkpointInterval = checkpointInterval;
910        }
911    
912        public int getIndexBinSize() {
913            return indexBinSize;
914        }
915    
916        public void setIndexBinSize(int indexBinSize) {
917            this.indexBinSize = indexBinSize;
918        }
919    
920        public int getIndexKeySize() {
921            return indexKeySize;
922        }
923    
924        public void setIndexKeySize(int indexKeySize) {
925            this.indexKeySize = indexKeySize;
926        }
927    
928        public int getIndexPageSize() {
929            return indexPageSize;
930        }
931    
932        public int getIndexMaxBinSize() {
933            return indexMaxBinSize;
934        }
935    
936        public void setIndexMaxBinSize(int maxBinSize) {
937            this.indexMaxBinSize = maxBinSize;
938        }
939    
940        /**
941         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
942         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
943         */
944        public void setIndexPageSize(int indexPageSize) {
945            this.indexPageSize = indexPageSize;
946        }
947    
948        public void setIndexLoadFactor(int factor){
949            this.indexLoadFactor=factor;
950        }
951    
952        public int getIndexLoadFactor(){
953            return this.indexLoadFactor;
954        }
955    
956        public int getMaxReferenceFileLength() {
957            return maxReferenceFileLength;
958        }
959    
960        /**
961         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
962         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
963         */
964        public void setMaxReferenceFileLength(int maxReferenceFileLength) {
965            this.maxReferenceFileLength = maxReferenceFileLength;
966        }
967    
968        public File getDirectoryArchive() {
969            return directoryArchive;
970        }
971    
972        public void setDirectoryArchive(File directoryArchive) {
973            this.directoryArchive = directoryArchive;
974        }
975    
976        public boolean isArchiveDataLogs() {
977            return archiveDataLogs;
978        }
979    
980        public void setArchiveDataLogs(boolean archiveDataLogs) {
981            this.archiveDataLogs = archiveDataLogs;
982        }
983    
984        public boolean isDisableLocking() {
985            return disableLocking;
986        }
987    
988        public void setDisableLocking(boolean disableLocking) {
989            this.disableLocking = disableLocking;
990        }
991    
992        /**
993         * @return the recoverReferenceStore
994         */
995        public boolean isRecoverReferenceStore() {
996            return recoverReferenceStore;
997        }
998    
999        /**
1000         * @param recoverReferenceStore the recoverReferenceStore to set
1001         */
1002        public void setRecoverReferenceStore(boolean recoverReferenceStore) {
1003            this.recoverReferenceStore = recoverReferenceStore;
1004        }
1005    
1006        /**
1007         * @return the forceRecoverReferenceStore
1008         */
1009        public boolean isForceRecoverReferenceStore() {
1010            return forceRecoverReferenceStore;
1011        }
1012    
1013        /**
1014         * @param forceRecoverReferenceStore the forceRecoverReferenceStore to set
1015         */
1016        public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) {
1017            this.forceRecoverReferenceStore = forceRecoverReferenceStore;
1018        }
1019    
1020        public boolean isUseDedicatedTaskRunner() {
1021            return useDedicatedTaskRunner;
1022        }
1023    
1024        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1025            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1026        }
1027    
1028        /**
1029         * @return the journalThreadPriority
1030         */
1031        public int getJournalThreadPriority() {
1032            return this.journalThreadPriority;
1033        }
1034    
1035        /**
1036         * @param journalThreadPriority the journalThreadPriority to set
1037         */
1038        public void setJournalThreadPriority(int journalThreadPriority) {
1039            this.journalThreadPriority = journalThreadPriority;
1040        }
1041    
1042    
1043        protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
1044            Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1045            if (map == null) {
1046                map = new ConcurrentHashMap<Integer, AtomicInteger>();
1047                dataFilesInProgress.put(store, map);
1048            }
1049            AtomicInteger count = map.get(dataFileId);
1050            if (count == null) {
1051                count = new AtomicInteger(0);
1052                map.put(dataFileId, count);
1053            }
1054            count.incrementAndGet();
1055        }
1056    
1057        protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
1058            Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1059            if (map != null) {
1060                AtomicInteger count = map.get(dataFileId);
1061                if (count != null) {
1062                    int newCount = count.decrementAndGet();
1063                    if (newCount <=0) {
1064                        map.remove(dataFileId);
1065                    }
1066                }
1067                if (map.isEmpty()) {
1068                    dataFilesInProgress.remove(store);
1069                }
1070            }
1071        }
1072    
1073    
1074        protected void lock() throws Exception {
1075            lockLogged = false;
1076            lockAquired = false;
1077            do {
1078                if (doLock()) {
1079                    lockAquired = true;
1080                } else {
1081                    if (!lockLogged) {
1082                        LOG.warn("Waiting to Lock the Store " + getDirectory());
1083                        lockLogged = true;
1084                    }
1085                    Thread.sleep(1000);
1086                }
1087    
1088            } while (!lockAquired && !disableLocking);
1089        }
1090    
1091        private synchronized void unlock() throws IOException {
1092            if (!disableLocking && (null != lock)) {
1093                //clear property doesn't work on some platforms
1094                System.getProperties().remove(getPropertyKey());
1095                System.clearProperty(getPropertyKey());
1096                assert(System.getProperty(getPropertyKey())==null);
1097                if (lock.isValid()) {
1098                    lock.release();
1099                    lock.channel().close();
1100    
1101                }
1102                lock = null;
1103            }
1104        }
1105    
1106    
1107        protected boolean doLock() throws IOException {
1108            boolean result = true;
1109            if (!disableLocking && directory != null && lock == null) {
1110                String key = getPropertyKey();
1111                String property = System.getProperty(key);
1112                if (null == property) {
1113                    if (!BROKEN_FILE_LOCK) {
1114                        lock = lockFile.getChannel().tryLock(0, Math.max(1, lockFile.getChannel().size()), false);
1115                        if (lock == null) {
1116                            result = false;
1117                        } else {
1118                            System.setProperty(key, new Date().toString());
1119                        }
1120                    }
1121                } else { // already locked
1122                    result = false;
1123                }
1124            }
1125            return result;
1126        }
1127    
1128        private String getPropertyKey() throws IOException {
1129            return getClass().getName() + ".lock." + directory.getCanonicalPath();
1130        }
1131    
1132        static {
1133            BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
1134                    + ".FileLockBroken",
1135                    "false"));
1136            DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
1137                   + ".DisableLocking",
1138                   "false"));
1139        }
1140    
1141    
1142        @Override
1143        public long getLastProducerSequenceId(ProducerId id) {
1144            // reference store send has adequate duplicate suppression
1145            return -1;
1146        }
1147    
1148        @Override
1149        public int getJournalMaxFileLength() {
1150            return getMaxFileLength();
1151        }
1152    }