001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.amq;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.io.RandomAccessFile;
022    import java.nio.channels.FileLock;
023    import java.util.Date;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.Iterator;
027    import java.util.Map;
028    import java.util.Set;
029    import java.util.concurrent.ConcurrentHashMap;
030    import java.util.concurrent.CountDownLatch;
031    import java.util.concurrent.atomic.AtomicBoolean;
032    import java.util.concurrent.atomic.AtomicInteger;
033    import java.util.concurrent.atomic.AtomicLong;
034    import org.apache.activeio.journal.Journal;
035    import org.apache.activemq.broker.BrokerService;
036    import org.apache.activemq.broker.BrokerServiceAware;
037    import org.apache.activemq.broker.ConnectionContext;
038    import org.apache.activemq.command.ActiveMQDestination;
039    import org.apache.activemq.command.ActiveMQQueue;
040    import org.apache.activemq.command.ActiveMQTopic;
041    import org.apache.activemq.command.DataStructure;
042    import org.apache.activemq.command.JournalQueueAck;
043    import org.apache.activemq.command.JournalTopicAck;
044    import org.apache.activemq.command.JournalTrace;
045    import org.apache.activemq.command.JournalTransaction;
046    import org.apache.activemq.command.Message;
047    import org.apache.activemq.command.ProducerId;
048    import org.apache.activemq.command.SubscriptionInfo;
049    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
050    import org.apache.activemq.kaha.impl.async.AsyncDataManager;
051    import org.apache.activemq.kaha.impl.async.Location;
052    import org.apache.activemq.kaha.impl.index.hash.HashIndex;
053    import org.apache.activemq.openwire.OpenWireFormat;
054    import org.apache.activemq.store.MessageStore;
055    import org.apache.activemq.store.PersistenceAdapter;
056    import org.apache.activemq.store.ReferenceStore;
057    import org.apache.activemq.store.ReferenceStoreAdapter;
058    import org.apache.activemq.store.TopicMessageStore;
059    import org.apache.activemq.store.TopicReferenceStore;
060    import org.apache.activemq.store.TransactionStore;
061    import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter;
062    import org.apache.activemq.thread.Scheduler;
063    import org.apache.activemq.thread.Task;
064    import org.apache.activemq.thread.TaskRunner;
065    import org.apache.activemq.thread.TaskRunnerFactory;
066    import org.apache.activemq.usage.SystemUsage;
067    import org.apache.activemq.usage.Usage;
068    import org.apache.activemq.usage.UsageListener;
069    import org.apache.activemq.util.ByteSequence;
070    import org.apache.activemq.util.IOExceptionSupport;
071    import org.apache.activemq.util.IOHelper;
072    import org.apache.activemq.wireformat.WireFormat;
073    import org.slf4j.Logger;
074    import org.slf4j.LoggerFactory;
075    
076    
077    /**
078     * An implementation of {@link PersistenceAdapter} designed for use with a
079     * {@link Journal} and then check pointing asynchronously on a timeout with some
080     * other long term persistent storage.
081     * 
082     * @org.apache.xbean.XBean element="amqPersistenceAdapter"
083     * 
084     */
085    public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, BrokerServiceAware {
086    
087        private static final Logger LOG = LoggerFactory.getLogger(AMQPersistenceAdapter.class);
088        private Scheduler scheduler;
089        private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
090        private final ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQTopicMessageStore>();
091        private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
092        private static final boolean BROKEN_FILE_LOCK;
093        private static final boolean DISABLE_LOCKING;
094        private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
095        private AsyncDataManager asyncDataManager;
096        private ReferenceStoreAdapter referenceStoreAdapter;
097        private TaskRunnerFactory taskRunnerFactory;
098        private WireFormat wireFormat = new OpenWireFormat();
099        private SystemUsage usageManager;
100        private long checkpointInterval = 1000 * 20;
101        private int maxCheckpointMessageAddSize = 1024 * 4;
102        private final AMQTransactionStore transactionStore = new AMQTransactionStore(this);
103        private TaskRunner checkpointTask;
104        private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
105        private final AtomicBoolean started = new AtomicBoolean(false);
106        private Runnable periodicCheckpointTask;
107        private Runnable periodicCleanupTask;
108        private boolean deleteAllMessages;
109        private boolean syncOnWrite;
110        private String brokerName = "";
111        private File directory;
112        private File directoryArchive;
113        private BrokerService brokerService;
114        private final AtomicLong storeSize = new AtomicLong();
115        private boolean persistentIndex=true;
116        private boolean useNio = true;
117        private boolean archiveDataLogs=false;
118        private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
119        private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
120        private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
121        private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
122        private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
123        private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
124        private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
125        private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
126        private final Map<AMQMessageStore,Map<Integer, AtomicInteger>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Map<Integer, AtomicInteger>> ();
127        private RandomAccessFile lockFile;
128        private FileLock lock;
129        private boolean disableLocking = DISABLE_LOCKING;
130            private boolean failIfJournalIsLocked;
131        private boolean lockLogged;
132        private boolean lockAquired;
133        private boolean recoverReferenceStore=true;
134        private boolean forceRecoverReferenceStore=false;
135        private boolean useDedicatedTaskRunner=false;
136        private int journalThreadPriority = Thread.MAX_PRIORITY;
137    
138        public String getBrokerName() {
139            return this.brokerName;
140        }
141    
142        public void setBrokerName(String brokerName) {
143            this.brokerName = brokerName;
144            if (this.referenceStoreAdapter != null) {
145                this.referenceStoreAdapter.setBrokerName(brokerName);
146            }
147        }
148    
149        public BrokerService getBrokerService() {
150            return brokerService;
151        }
152    
153        public void setBrokerService(BrokerService brokerService) {
154            this.brokerService = brokerService;
155        }
156    
157        public synchronized void start() throws Exception {
158            if (!started.compareAndSet(false, true)) {
159                return;
160            }
161            if (this.directory == null) {
162                if (brokerService != null) {
163                    this.directory = brokerService.getBrokerDataDirectory();
164                   
165                } else {
166                    this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
167                    this.directory = new File(directory, "amqstore");
168                    directory.getAbsolutePath();
169                }
170            }
171            if (this.directoryArchive == null) {
172                this.directoryArchive = new File(this.directory,"archive");
173            }
174            if (this.brokerService != null) {
175                this.taskRunnerFactory = this.brokerService.getTaskRunnerFactory();
176                this.scheduler = this.brokerService.getScheduler();
177            } else {
178                this.taskRunnerFactory = new TaskRunnerFactory("AMQPersistenceAdaptor Task", getJournalThreadPriority(),
179                    true, 1000, isUseDedicatedTaskRunner());
180                this.scheduler = new Scheduler("AMQPersistenceAdapter Scheduler");
181            }
182    
183            IOHelper.mkdirs(this.directory);
184            lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
185            lock();
186            LOG.info("AMQStore starting using directory: " + directory); 
187            if (archiveDataLogs) {
188                IOHelper.mkdirs(this.directoryArchive);
189            }
190    
191            if (this.usageManager != null) {
192                this.usageManager.getMemoryUsage().addUsageListener(this);
193            }
194            if (asyncDataManager == null) {
195                asyncDataManager = createAsyncDataManager();
196            }
197            if (referenceStoreAdapter == null) {
198                referenceStoreAdapter = createReferenceStoreAdapter();
199            }
200            referenceStoreAdapter.setDirectory(new File(directory, "kr-store"));
201            referenceStoreAdapter.setBrokerName(getBrokerName());
202            referenceStoreAdapter.setUsageManager(usageManager);
203            referenceStoreAdapter.setMaxDataFileLength(getMaxReferenceFileLength());
204            
205            if (failIfJournalIsLocked) {
206                asyncDataManager.lock();
207            } else {
208                while (true) {
209                    try {
210                        asyncDataManager.lock();
211                        break;
212                    } catch (IOException e) {
213                        LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.", e);
214                        try {
215                            Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
216                        } catch (InterruptedException e1) {
217                        }
218                    }
219                }
220            }
221            
222            asyncDataManager.start();
223            if (deleteAllMessages) {
224                asyncDataManager.delete();
225                try {
226                    JournalTrace trace = new JournalTrace();
227                    trace.setMessage("DELETED " + new Date());
228                    Location location = asyncDataManager.write(wireFormat.marshal(trace), false);
229                    asyncDataManager.setMark(location, true);
230                    LOG.info("Journal deleted: ");
231                    deleteAllMessages = false;
232                } catch (IOException e) {
233                    throw e;
234                } catch (Throwable e) {
235                    throw IOExceptionSupport.create(e);
236                }
237                referenceStoreAdapter.deleteAllMessages();
238            }
239            referenceStoreAdapter.start();
240            Set<Integer> files = referenceStoreAdapter.getReferenceFileIdsInUse();
241            LOG.info("Active data files: " + files);
242            checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
243    
244                public boolean iterate() {
245                    doCheckpoint();
246                    return false;
247                }
248            }, "ActiveMQ Journal Checkpoint Worker");
249            createTransactionStore();
250    
251            //
252            // The following was attempting to reduce startup times by avoiding the
253            // log
254            // file scanning that recovery performs. The problem with it is that XA
255            // transactions
256            // only live in transaction log and are not stored in the reference
257            // store, but they still
258            // need to be recovered when the broker starts up.
259    
260            if (isForceRecoverReferenceStore()
261                    || (isRecoverReferenceStore() && !referenceStoreAdapter
262                            .isStoreValid())) {
263                LOG.warn("The ReferenceStore is not valid - recovering ...");
264                recover();
265                LOG.info("Finished recovering the ReferenceStore");
266            } else {
267                Location location = writeTraceMessage("RECOVERED " + new Date(),
268                        true);
269                asyncDataManager.setMark(location, true);
270                // recover transactions
271                getTransactionStore().setPreparedTransactions(
272                        referenceStoreAdapter.retrievePreparedState());
273            }
274    
275            // Do a checkpoint periodically.
276            periodicCheckpointTask = new Runnable() {
277    
278                public void run() {
279                    checkpoint(false);
280                }
281            };
282            scheduler.executePeriodically(periodicCheckpointTask, getCheckpointInterval());
283            periodicCleanupTask = new Runnable() {
284    
285                public void run() {
286                    cleanup();
287                }
288            };
289            scheduler.executePeriodically(periodicCleanupTask, getCleanupInterval());
290            
291            if (lockAquired && lockLogged) {
292                LOG.info("Aquired lock for AMQ Store" + getDirectory());
293                if (brokerService != null) {
294                    brokerService.getBroker().nowMasterBroker();
295                }
296            }
297    
298        }
299    
300        public void stop() throws Exception {
301    
302            if (!started.compareAndSet(true, false)) {
303                return;
304            }
305            unlock();
306            if (lockFile != null) {
307                lockFile.close();
308                lockFile = null;
309            }
310            this.usageManager.getMemoryUsage().removeUsageListener(this);
311            synchronized (this) {
312                scheduler.cancel(periodicCheckpointTask);
313                scheduler.cancel(periodicCleanupTask);
314            }
315            Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
316            while (queueIterator.hasNext()) {
317                AMQMessageStore ms = queueIterator.next();
318                ms.stop();
319            }
320            Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
321            while (topicIterator.hasNext()) {
322                final AMQTopicMessageStore ms = topicIterator.next();
323                ms.stop();
324            }
325            // Take one final checkpoint and stop checkpoint processing.
326            checkpoint(true);
327            synchronized (this) {
328                checkpointTask.shutdown();
329            }
330            referenceStoreAdapter.savePreparedState(getTransactionStore().getPreparedTransactions());
331            queues.clear();
332            topics.clear();
333            IOException firstException = null;
334            referenceStoreAdapter.stop();
335            referenceStoreAdapter = null;
336    
337            if (this.brokerService == null) {
338                this.taskRunnerFactory.shutdown();
339                this.scheduler.stop();
340            }
341            try {
342                LOG.debug("Journal close");
343                asyncDataManager.close();
344            } catch (Exception e) {
345                firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
346            }
347            if (firstException != null) {
348                throw firstException;
349            }
350        }
351    
352        /**
353         * When we checkpoint we move all the journalled data to long term storage.
354         * 
355         * @param sync
356         */
357        public void checkpoint(boolean sync) {
358            try {
359                if (asyncDataManager == null) {
360                    throw new IllegalStateException("Journal is closed.");
361                }
362                CountDownLatch latch = null;
363                synchronized (this) {
364                    latch = nextCheckpointCountDownLatch;
365                    checkpointTask.wakeup();
366                }
367                if (sync) {
368                    if (LOG.isDebugEnabled()) {
369                        LOG.debug("Waitng for checkpoint to complete.");
370                    }
371                    latch.await();
372                }
373                referenceStoreAdapter.checkpoint(sync);
374            } catch (InterruptedException e) {
375                Thread.currentThread().interrupt();
376                LOG.warn("Request to start checkpoint failed: " + e, e);
377            } catch (IOException e) {
378                LOG.error("checkpoint failed: " + e, e);
379            }
380        }
381    
382        /**
383         * This does the actual checkpoint.
384         * 
385         * @return true if successful
386         */
387        public boolean doCheckpoint() {
388            CountDownLatch latch = null;
389            synchronized (this) {
390                latch = nextCheckpointCountDownLatch;
391                nextCheckpointCountDownLatch = new CountDownLatch(1);
392            }
393            try {
394                if (LOG.isDebugEnabled()) {
395                    LOG.debug("Checkpoint started.");
396                }
397    
398                Location currentMark = asyncDataManager.getMark();
399                Location newMark = currentMark;
400                Iterator<AMQMessageStore> queueIterator = queues.values().iterator();
401                while (queueIterator.hasNext()) {
402                    final AMQMessageStore ms = queueIterator.next();
403                    Location mark = ms.getMark();
404                    if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
405                        newMark = mark;
406                    }
407                }
408                Iterator<AMQTopicMessageStore> topicIterator = topics.values().iterator();
409                while (topicIterator.hasNext()) {
410                    final AMQTopicMessageStore ms = topicIterator.next();
411                    Location mark = ms.getMark();
412                    if (mark != null && (newMark == null || mark.compareTo(newMark) > 0)) {
413                        newMark = mark;
414                    }
415                }
416                try {
417                    if (newMark != currentMark) {
418                        if (LOG.isDebugEnabled()) {
419                            LOG.debug("Marking journal at: " + newMark);
420                        }
421                        asyncDataManager.setMark(newMark, false);
422                        writeTraceMessage("CHECKPOINT " + new Date(), true);
423                    }
424                } catch (Exception e) {
425                    LOG.error("Failed to mark the Journal: " + e, e);
426                }
427                if (LOG.isDebugEnabled()) {
428                    LOG.debug("Checkpoint done.");
429                }
430            } finally {
431                latch.countDown();
432            }
433            return true;
434        }
435    
436        /**
437         * Cleans up the data files
438         * @throws IOException
439         */
440        public void cleanup() {
441            try {
442                Set<Integer>inProgress = new HashSet<Integer>();
443                if (LOG.isDebugEnabled()) {
444                    LOG.debug("dataFilesInProgress.values: (" + dataFilesInProgress.values().size() + ") " + dataFilesInProgress.values());
445                }      
446                for (Map<Integer, AtomicInteger> set: dataFilesInProgress.values()) {
447                    inProgress.addAll(set.keySet());
448                }
449                Integer lastDataFile = asyncDataManager.getCurrentDataFileId();   
450                inProgress.add(lastDataFile);
451                lastDataFile = asyncDataManager.getMark().getDataFileId();
452                inProgress.addAll(referenceStoreAdapter.getReferenceFileIdsInUse());
453                Location lastActiveTx = transactionStore.checkpoint();
454                if (lastActiveTx != null) {
455                    lastDataFile = Math.min(lastDataFile, lastActiveTx.getDataFileId());
456                }
457                LOG.debug("lastDataFile: " + lastDataFile);
458                asyncDataManager.consolidateDataFilesNotIn(inProgress, lastDataFile - 1);
459            } catch (IOException e) {
460                LOG.error("Could not cleanup data files: " + e, e);
461            }
462        }
463    
464        public Set<ActiveMQDestination> getDestinations() {
465            Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(referenceStoreAdapter.getDestinations());
466            destinations.addAll(queues.keySet());
467            destinations.addAll(topics.keySet());
468            return destinations;
469        }
470    
471        MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
472            if (destination.isQueue()) {
473                return createQueueMessageStore((ActiveMQQueue)destination);
474            } else {
475                return createTopicMessageStore((ActiveMQTopic)destination);
476            }
477        }
478    
479        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
480            AMQMessageStore store = queues.get(destination);
481            if (store == null) {
482                ReferenceStore checkpointStore = referenceStoreAdapter.createQueueReferenceStore(destination);
483                store = new AMQMessageStore(this, checkpointStore, destination);
484                try {
485                    store.start();
486                } catch (Exception e) {
487                    throw IOExceptionSupport.create(e);
488                }
489                queues.put(destination, store);
490            }
491            return store;
492        }
493    
494        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
495            AMQTopicMessageStore store = topics.get(destinationName);
496            if (store == null) {
497                TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
498                store = new AMQTopicMessageStore(this,checkpointStore, destinationName);
499                try {
500                    store.start();
501                } catch (Exception e) {
502                    throw IOExceptionSupport.create(e);
503                }
504                topics.put(destinationName, store);
505            }
506            return store;
507        }
508    
509        /**
510         * Cleanup method to remove any state associated with the given destination
511         *
512         * @param destination
513         */
514        public void removeQueueMessageStore(ActiveMQQueue destination) {
515            AMQMessageStore store= queues.remove(destination);
516            referenceStoreAdapter.removeQueueMessageStore(destination);
517        }
518    
519        /**
520         * Cleanup method to remove any state associated with the given destination
521         *
522         * @param destination
523         */
524        public void removeTopicMessageStore(ActiveMQTopic destination) {
525            topics.remove(destination);
526        }
527    
528        public TransactionStore createTransactionStore() throws IOException {
529            return transactionStore;
530        }
531    
532        public long getLastMessageBrokerSequenceId() throws IOException {
533            return referenceStoreAdapter.getLastMessageBrokerSequenceId();
534        }
535    
536        public void beginTransaction(ConnectionContext context) throws IOException {
537            referenceStoreAdapter.beginTransaction(context);
538        }
539    
540        public void commitTransaction(ConnectionContext context) throws IOException {
541            referenceStoreAdapter.commitTransaction(context);
542        }
543    
544        public void rollbackTransaction(ConnectionContext context) throws IOException {
545            referenceStoreAdapter.rollbackTransaction(context);
546        }
547        
548        public boolean isPersistentIndex() {
549                    return persistentIndex;
550            }
551    
552            public void setPersistentIndex(boolean persistentIndex) {
553                    this.persistentIndex = persistentIndex;
554            }
555    
556        /**
557         * @param location
558         * @return
559         * @throws IOException
560         */
561        public DataStructure readCommand(Location location) throws IOException {
562            try {
563                ByteSequence packet = asyncDataManager.read(location);
564                return (DataStructure)wireFormat.unmarshal(packet);
565            } catch (IOException e) {
566                throw createReadException(location, e);
567            }
568        }
569    
570        /**
571         * Move all the messages that were in the journal into long term storage. We
572         * just replay and do a checkpoint.
573         * 
574         * @throws IOException
575         * @throws IOException
576         * @throws IllegalStateException
577         */
578        private void recover() throws IllegalStateException, IOException {
579            referenceStoreAdapter.clearMessages();
580            Location pos = null;
581            int redoCounter = 0;
582            LOG.info("Journal Recovery Started from: " + asyncDataManager);
583            long start = System.currentTimeMillis();
584            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
585            // While we have records in the journal.
586            while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
587                ByteSequence data = asyncDataManager.read(pos);
588                DataStructure c = (DataStructure)wireFormat.unmarshal(data);
589                if (c instanceof Message) {
590                    Message message = (Message)c;
591                    AMQMessageStore store = (AMQMessageStore)createMessageStore(message.getDestination());
592                    if (message.isInTransaction()) {
593                        transactionStore.addMessage(store, message, pos);
594                    } else {
595                        if (store.replayAddMessage(context, message, pos)) {
596                            redoCounter++;
597                        }
598                    }
599                } else {
600                    switch (c.getDataStructureType()) {
601                    case SubscriptionInfo.DATA_STRUCTURE_TYPE: {
602                        referenceStoreAdapter.recoverSubscription((SubscriptionInfo)c);
603                    }
604                        break;
605                    case JournalQueueAck.DATA_STRUCTURE_TYPE: {
606                        JournalQueueAck command = (JournalQueueAck)c;
607                        AMQMessageStore store = (AMQMessageStore)createMessageStore(command.getDestination());
608                        if (command.getMessageAck().isInTransaction()) {
609                            transactionStore.removeMessage(store, command.getMessageAck(), pos);
610                        } else {
611                            if (store.replayRemoveMessage(context, command.getMessageAck())) {
612                                redoCounter++;
613                            }
614                        }
615                    }
616                        break;
617                    case JournalTopicAck.DATA_STRUCTURE_TYPE: {
618                        JournalTopicAck command = (JournalTopicAck)c;
619                        AMQTopicMessageStore store = (AMQTopicMessageStore)createMessageStore(command.getDestination());
620                        if (command.getTransactionId() != null) {
621                            transactionStore.acknowledge(store, command, pos);
622                        } else {
623                            if (store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId())) {
624                                redoCounter++;
625                            }
626                        }
627                    }
628                        break;
629                    case JournalTransaction.DATA_STRUCTURE_TYPE: {
630                        JournalTransaction command = (JournalTransaction)c;
631                        try {
632                            // Try to replay the packet.
633                            switch (command.getType()) {
634                            case JournalTransaction.XA_PREPARE:
635                                transactionStore.replayPrepare(command.getTransactionId());
636                                break;
637                            case JournalTransaction.XA_COMMIT:
638                            case JournalTransaction.LOCAL_COMMIT:
639                                AMQTx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
640                                if (tx == null) {
641                                    break; // We may be trying to replay a commit
642                                }
643                                // that
644                                // was already committed.
645                                // Replay the committed operations.
646                                tx.getOperations();
647                                for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
648                                    AMQTxOperation op = (AMQTxOperation)iter.next();
649                                    if (op.replay(this, context)) {
650                                        redoCounter++;
651                                    }
652                                }
653                                break;
654                            case JournalTransaction.LOCAL_ROLLBACK:
655                            case JournalTransaction.XA_ROLLBACK:
656                                transactionStore.replayRollback(command.getTransactionId());
657                                break;
658                            default:
659                                throw new IOException("Invalid journal command type: " + command.getType());
660                            }
661                        } catch (IOException e) {
662                            LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
663                        }
664                    }
665                        break;
666                    case JournalTrace.DATA_STRUCTURE_TYPE:
667                        JournalTrace trace = (JournalTrace)c;
668                        LOG.debug("TRACE Entry: " + trace.getMessage());
669                        break;
670                    default:
671                        LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
672                    }
673                }
674            }
675            Location location = writeTraceMessage("RECOVERED " + new Date(), true);
676            asyncDataManager.setMark(location, true);
677            long end = System.currentTimeMillis();
678            LOG.info("Recovered " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
679        }
680    
681        private IOException createReadException(Location location, Exception e) {
682            return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
683        }
684    
685        protected IOException createWriteException(DataStructure packet, Exception e) {
686            return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
687        }
688    
689        protected IOException createWriteException(String command, Exception e) {
690            return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
691        }
692    
693        protected IOException createRecoveryFailedException(Exception e) {
694            return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
695        }
696    
697        /**
698         * @param command
699         * @param syncHint
700         * @return
701         * @throws IOException
702         */
703        public Location writeCommand(DataStructure command, boolean syncHint) throws IOException {
704            return writeCommand(command, syncHint,false);
705        }
706        
707        public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException {
708            try {
709                    return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
710            } catch (IOException ioe) {
711                    LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
712                    brokerService.handleIOException(ioe);
713                    throw ioe;
714            }
715        }
716    
717        private Location writeTraceMessage(String message, boolean sync) throws IOException {
718            JournalTrace trace = new JournalTrace();
719            trace.setMessage(message);
720            return writeCommand(trace, sync);
721        }
722    
723        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
724            newPercentUsage = (newPercentUsage / 10) * 10;
725            oldPercentUsage = (oldPercentUsage / 10) * 10;
726            if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
727                checkpoint(false);
728            }
729        }
730    
731        public AMQTransactionStore getTransactionStore() {
732            return transactionStore;
733        }
734    
735        public synchronized void deleteAllMessages() throws IOException {
736            deleteAllMessages = true;
737        }
738    
739        @Override
740        public String toString() {
741            return "AMQPersistenceAdapter(" + directory + ")";
742        }
743    
744        // /////////////////////////////////////////////////////////////////
745        // Subclass overridables
746        // /////////////////////////////////////////////////////////////////
747        protected AsyncDataManager createAsyncDataManager() {
748            AsyncDataManager manager = new AsyncDataManager(storeSize);
749            manager.setDirectory(new File(directory, "journal"));
750            manager.setDirectoryArchive(getDirectoryArchive());
751            manager.setArchiveDataLogs(isArchiveDataLogs());
752            manager.setMaxFileLength(maxFileLength);
753            manager.setUseNio(useNio);    
754            return manager;
755        }
756    
757        protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
758            KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
759            adaptor.setPersistentIndex(isPersistentIndex());
760            adaptor.setIndexBinSize(getIndexBinSize());
761            adaptor.setIndexKeySize(getIndexKeySize());
762            adaptor.setIndexPageSize(getIndexPageSize());
763            adaptor.setIndexMaxBinSize(getIndexMaxBinSize());
764            adaptor.setIndexLoadFactor(getIndexLoadFactor());
765            return adaptor;
766        }
767    
768        // /////////////////////////////////////////////////////////////////
769        // Property Accessors
770        // /////////////////////////////////////////////////////////////////
771        public AsyncDataManager getAsyncDataManager() {
772            return asyncDataManager;
773        }
774    
775        public void setAsyncDataManager(AsyncDataManager asyncDataManager) {
776            this.asyncDataManager = asyncDataManager;
777        }
778    
779        public ReferenceStoreAdapter getReferenceStoreAdapter() {
780            return referenceStoreAdapter;
781        }
782    
783        public TaskRunnerFactory getTaskRunnerFactory() {
784            return taskRunnerFactory;
785        }
786    
787        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
788            this.taskRunnerFactory = taskRunnerFactory;
789        }
790    
791        /**
792         * @return Returns the wireFormat.
793         */
794        public WireFormat getWireFormat() {
795            return wireFormat;
796        }
797    
798        public void setWireFormat(WireFormat wireFormat) {
799            this.wireFormat = wireFormat;
800        }
801    
802        public SystemUsage getUsageManager() {
803            return usageManager;
804        }
805    
806        public void setUsageManager(SystemUsage usageManager) {
807            this.usageManager = usageManager;
808        }
809    
810        public int getMaxCheckpointMessageAddSize() {
811            return maxCheckpointMessageAddSize;
812        }
813    
814        /** 
815         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
816         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
817         */
818        public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
819            this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
820        }
821    
822       
823        public synchronized File getDirectory() {
824            return directory;
825        }
826    
827        public synchronized void setDirectory(File directory) {
828            this.directory = directory;
829        }
830    
831        public boolean isSyncOnWrite() {
832            return this.syncOnWrite;
833        }
834    
835        public void setSyncOnWrite(boolean syncOnWrite) {
836            this.syncOnWrite = syncOnWrite;
837        }
838        
839        /**
840         * @param referenceStoreAdapter the referenceStoreAdapter to set
841         */
842        public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
843            this.referenceStoreAdapter = referenceStoreAdapter;
844        }
845        
846        public long size(){
847            return storeSize.get();
848        }
849    
850            public boolean isUseNio() {
851                    return useNio;
852            }
853    
854            public void setUseNio(boolean useNio) {
855                    this.useNio = useNio;
856            }
857    
858            public int getMaxFileLength() {
859                    return maxFileLength;
860            }
861    
862             /**
863          * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
864          * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
865          */
866            public void setMaxFileLength(int maxFileLength) {
867                    this.maxFileLength = maxFileLength;
868            }
869            
870            public long getCleanupInterval() {
871            return cleanupInterval;
872        }
873    
874        public void setCleanupInterval(long cleanupInterval) {
875            this.cleanupInterval = cleanupInterval;
876        }
877    
878        public long getCheckpointInterval() {
879            return checkpointInterval;
880        }
881    
882        public void setCheckpointInterval(long checkpointInterval) {
883            this.checkpointInterval = checkpointInterval;
884        }
885        
886        public int getIndexBinSize() {
887            return indexBinSize;
888        }
889    
890        public void setIndexBinSize(int indexBinSize) {
891            this.indexBinSize = indexBinSize;
892        }
893    
894        public int getIndexKeySize() {
895            return indexKeySize;
896        }
897    
898        public void setIndexKeySize(int indexKeySize) {
899            this.indexKeySize = indexKeySize;
900        }
901    
902        public int getIndexPageSize() {
903            return indexPageSize;
904        }
905        
906        public int getIndexMaxBinSize() {
907            return indexMaxBinSize;
908        }
909    
910        public void setIndexMaxBinSize(int maxBinSize) {
911            this.indexMaxBinSize = maxBinSize;
912        }
913    
914        /**
915         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
916         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
917         */
918        public void setIndexPageSize(int indexPageSize) {
919            this.indexPageSize = indexPageSize;
920        }
921        
922        public void setIndexLoadFactor(int factor){
923            this.indexLoadFactor=factor;    
924        }
925        
926        public int getIndexLoadFactor(){
927            return this.indexLoadFactor;
928        }
929        
930        public int getMaxReferenceFileLength() {
931            return maxReferenceFileLength;
932        }
933    
934        /**
935         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
936         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
937         */
938        public void setMaxReferenceFileLength(int maxReferenceFileLength) {
939            this.maxReferenceFileLength = maxReferenceFileLength;
940        }
941        
942        public File getDirectoryArchive() {
943            return directoryArchive;
944        }
945    
946        public void setDirectoryArchive(File directoryArchive) {
947            this.directoryArchive = directoryArchive;
948        }
949    
950        public boolean isArchiveDataLogs() {
951            return archiveDataLogs;
952        }
953    
954        public void setArchiveDataLogs(boolean archiveDataLogs) {
955            this.archiveDataLogs = archiveDataLogs;
956        }  
957        
958        public boolean isDisableLocking() {
959            return disableLocking;
960        }
961    
962        public void setDisableLocking(boolean disableLocking) {
963            this.disableLocking = disableLocking;
964        }
965        
966        /**
967         * @return the recoverReferenceStore
968         */
969        public boolean isRecoverReferenceStore() {
970            return recoverReferenceStore;
971        }
972    
973        /**
974         * @param recoverReferenceStore the recoverReferenceStore to set
975         */
976        public void setRecoverReferenceStore(boolean recoverReferenceStore) {
977            this.recoverReferenceStore = recoverReferenceStore;
978        }
979    
980        /**
981         * @return the forceRecoverReferenceStore
982         */
983        public boolean isForceRecoverReferenceStore() {
984            return forceRecoverReferenceStore;
985        }
986    
987        /**
988         * @param forceRecoverReferenceStore the forceRecoverReferenceStore to set
989         */
990        public void setForceRecoverReferenceStore(boolean forceRecoverReferenceStore) {
991            this.forceRecoverReferenceStore = forceRecoverReferenceStore;
992        }
993        
994        public boolean isUseDedicatedTaskRunner() {
995            return useDedicatedTaskRunner;
996        }
997        
998        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
999            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1000        }
1001        
1002        /**
1003         * @return the journalThreadPriority
1004         */
1005        public int getJournalThreadPriority() {
1006            return this.journalThreadPriority;
1007        }
1008    
1009        /**
1010         * @param journalThreadPriority the journalThreadPriority to set
1011         */
1012        public void setJournalThreadPriority(int journalThreadPriority) {
1013            this.journalThreadPriority = journalThreadPriority;
1014        }
1015    
1016            
1017            protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
1018                Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1019                if (map == null) {
1020                    map = new ConcurrentHashMap<Integer, AtomicInteger>();
1021                    dataFilesInProgress.put(store, map);
1022                }
1023                AtomicInteger count = map.get(dataFileId);
1024                if (count == null) {
1025                    count = new AtomicInteger(0);
1026                    map.put(dataFileId, count);
1027                }
1028                count.incrementAndGet();
1029            }
1030            
1031            protected void removeInProgressDataFile(AMQMessageStore store,int dataFileId) {
1032            Map<Integer, AtomicInteger> map = dataFilesInProgress.get(store);
1033            if (map != null) {
1034                AtomicInteger count = map.get(dataFileId);
1035                if (count != null) {
1036                    int newCount = count.decrementAndGet(); 
1037                    if (newCount <=0) {
1038                        map.remove(dataFileId);
1039                    }
1040                }
1041                if (map.isEmpty()) {
1042                    dataFilesInProgress.remove(store);
1043                }
1044            }
1045        }
1046            
1047            
1048            protected void lock() throws Exception {
1049            lockLogged = false;
1050            lockAquired = false;
1051            do {
1052                if (doLock()) {
1053                    lockAquired = true;
1054                } else {
1055                    if (!lockLogged) {
1056                        LOG.warn("Waiting to Lock the Store " + getDirectory());
1057                        lockLogged = true;
1058                    }
1059                    Thread.sleep(1000);
1060                }
1061    
1062            } while (!lockAquired && !disableLocking);
1063        }
1064            
1065            private synchronized void unlock() throws IOException {
1066            if (!disableLocking && (null != lock)) {
1067                //clear property doesn't work on some platforms
1068                System.getProperties().remove(getPropertyKey());
1069                System.clearProperty(getPropertyKey());
1070                assert(System.getProperty(getPropertyKey())==null);
1071                if (lock.isValid()) {
1072                    lock.release();
1073                    lock.channel().close();
1074                    
1075                }
1076                lock = null;
1077            }
1078        }
1079    
1080            
1081            protected boolean doLock() throws IOException {
1082                boolean result = true;
1083                if (!disableLocking && directory != null && lock == null) {
1084                String key = getPropertyKey();
1085                String property = System.getProperty(key);
1086                if (null == property) {
1087                    if (!BROKEN_FILE_LOCK) {
1088                        lock = lockFile.getChannel().tryLock(0, Math.max(1, lockFile.getChannel().size()), false);
1089                        if (lock == null) {
1090                            result = false;
1091                        } else {
1092                            System.setProperty(key, new Date().toString());
1093                        }
1094                    }
1095                } else { // already locked
1096                    result = false;
1097                }
1098            }
1099                return result;
1100            }
1101            
1102            private String getPropertyKey() throws IOException {
1103            return getClass().getName() + ".lock." + directory.getCanonicalPath();
1104        }
1105            
1106            static {
1107                BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
1108                        + ".FileLockBroken",
1109                        "false"));
1110                DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
1111                       + ".DisableLocking",
1112                       "false"));
1113            }
1114    
1115            
1116        public long getLastProducerSequenceId(ProducerId id) {
1117            // reference store send has adequate duplicate suppression
1118            return -1;
1119        }
1120    }