001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.journal;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.HashSet;
023    import java.util.Iterator;
024    import java.util.Set;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.CountDownLatch;
028    import java.util.concurrent.FutureTask;
029    import java.util.concurrent.LinkedBlockingQueue;
030    import java.util.concurrent.ThreadFactory;
031    import java.util.concurrent.ThreadPoolExecutor;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicBoolean;
034    import org.apache.activeio.journal.InvalidRecordLocationException;
035    import org.apache.activeio.journal.Journal;
036    import org.apache.activeio.journal.JournalEventListener;
037    import org.apache.activeio.journal.RecordLocation;
038    import org.apache.activeio.packet.ByteArrayPacket;
039    import org.apache.activeio.packet.Packet;
040    import org.apache.activemq.broker.BrokerService;
041    import org.apache.activemq.broker.BrokerServiceAware;
042    import org.apache.activemq.broker.ConnectionContext;
043    import org.apache.activemq.command.ActiveMQDestination;
044    import org.apache.activemq.command.ActiveMQQueue;
045    import org.apache.activemq.command.ActiveMQTopic;
046    import org.apache.activemq.command.DataStructure;
047    import org.apache.activemq.command.JournalQueueAck;
048    import org.apache.activemq.command.JournalTopicAck;
049    import org.apache.activemq.command.JournalTrace;
050    import org.apache.activemq.command.JournalTransaction;
051    import org.apache.activemq.command.Message;
052    import org.apache.activemq.command.MessageAck;
053    import org.apache.activemq.command.ProducerId;
054    import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
055    import org.apache.activemq.openwire.OpenWireFormat;
056    import org.apache.activemq.store.MessageStore;
057    import org.apache.activemq.store.PersistenceAdapter;
058    import org.apache.activemq.store.TopicMessageStore;
059    import org.apache.activemq.store.TransactionStore;
060    import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
061    import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
062    import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation;
063    import org.apache.activemq.thread.Scheduler;
064    import org.apache.activemq.thread.Task;
065    import org.apache.activemq.thread.TaskRunner;
066    import org.apache.activemq.thread.TaskRunnerFactory;
067    import org.apache.activemq.usage.SystemUsage;
068    import org.apache.activemq.usage.Usage;
069    import org.apache.activemq.usage.UsageListener;
070    import org.apache.activemq.util.ByteSequence;
071    import org.apache.activemq.util.IOExceptionSupport;
072    import org.apache.activemq.util.ThreadPoolUtils;
073    import org.apache.activemq.wireformat.WireFormat;
074    import org.slf4j.Logger;
075    import org.slf4j.LoggerFactory;
076    
077    /**
078     * An implementation of {@link PersistenceAdapter} designed for use with a
079     * {@link Journal} and then check pointing asynchronously on a timeout with some
080     * other long term persistent storage.
081     * 
082     * @org.apache.xbean.XBean
083     * 
084     */
085    public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
086    
087        private BrokerService brokerService;
088            
089        protected Scheduler scheduler;
090        private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
091    
092        private Journal journal;
093        private PersistenceAdapter longTermPersistence;
094    
095        private final WireFormat wireFormat = new OpenWireFormat();
096    
097        private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
098        private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
099    
100        private SystemUsage usageManager;
101        private final long checkpointInterval = 1000 * 60 * 5;
102        private long lastCheckpointRequest = System.currentTimeMillis();
103        private long lastCleanup = System.currentTimeMillis();
104        private int maxCheckpointWorkers = 10;
105        private int maxCheckpointMessageAddSize = 1024 * 1024;
106    
107        private final JournalTransactionStore transactionStore = new JournalTransactionStore(this);
108        private ThreadPoolExecutor checkpointExecutor;
109    
110        private TaskRunner checkpointTask;
111        private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
112        private boolean fullCheckPoint;
113    
114        private final AtomicBoolean started = new AtomicBoolean(false);
115    
116        private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
117    
118        private TaskRunnerFactory taskRunnerFactory;
119        private File directory;
120    
121        public JournalPersistenceAdapter() {        
122        }
123        
124        public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
125            setJournal(journal);
126            setTaskRunnerFactory(taskRunnerFactory);
127            setPersistenceAdapter(longTermPersistence);
128        }
129    
130        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
131            this.taskRunnerFactory = taskRunnerFactory;
132        }
133    
134        public void setJournal(Journal journal) {
135            this.journal = journal;
136            journal.setJournalEventListener(this);
137        }
138        
139        public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
140            this.longTermPersistence = longTermPersistence;
141        }
142        
143        final Runnable createPeriodicCheckpointTask() {
144            return new Runnable() {
145                public void run() {
146                    long lastTime = 0;
147                    synchronized (this) {
148                        lastTime = lastCheckpointRequest;
149                    }
150                    if (System.currentTimeMillis() > lastTime + checkpointInterval) {
151                        checkpoint(false, true);
152                    }
153                }
154            };
155        }
156    
157        /**
158         * @param usageManager The UsageManager that is controlling the
159         *                destination's memory usage.
160         */
161        public void setUsageManager(SystemUsage usageManager) {
162            this.usageManager = usageManager;
163            longTermPersistence.setUsageManager(usageManager);
164        }
165    
166        public Set<ActiveMQDestination> getDestinations() {
167            Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
168            destinations.addAll(queues.keySet());
169            destinations.addAll(topics.keySet());
170            return destinations;
171        }
172    
173        private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
174            if (destination.isQueue()) {
175                return createQueueMessageStore((ActiveMQQueue)destination);
176            } else {
177                return createTopicMessageStore((ActiveMQTopic)destination);
178            }
179        }
180    
181        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
182            JournalMessageStore store = queues.get(destination);
183            if (store == null) {
184                MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
185                store = new JournalMessageStore(this, checkpointStore, destination);
186                queues.put(destination, store);
187            }
188            return store;
189        }
190    
191        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
192            JournalTopicMessageStore store = topics.get(destinationName);
193            if (store == null) {
194                TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
195                store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
196                topics.put(destinationName, store);
197            }
198            return store;
199        }
200    
201        /**
202         * Cleanup method to remove any state associated with the given destination
203         *
204         * @param destination Destination to forget
205         */
206        public void removeQueueMessageStore(ActiveMQQueue destination) {
207            queues.remove(destination);
208        }
209    
210        /**
211         * Cleanup method to remove any state associated with the given destination
212         *
213         * @param destination Destination to forget
214         */
215        public void removeTopicMessageStore(ActiveMQTopic destination) {
216            topics.remove(destination);
217        }
218    
219        public TransactionStore createTransactionStore() throws IOException {
220            return transactionStore;
221        }
222    
223        public long getLastMessageBrokerSequenceId() throws IOException {
224            return longTermPersistence.getLastMessageBrokerSequenceId();
225        }
226    
227        public void beginTransaction(ConnectionContext context) throws IOException {
228            longTermPersistence.beginTransaction(context);
229        }
230    
231        public void commitTransaction(ConnectionContext context) throws IOException {
232            longTermPersistence.commitTransaction(context);
233        }
234    
235        public void rollbackTransaction(ConnectionContext context) throws IOException {
236            longTermPersistence.rollbackTransaction(context);
237        }
238    
239        public synchronized void start() throws Exception {
240            if (!started.compareAndSet(false, true)) {
241                return;
242            }
243    
244            checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
245                public boolean iterate() {
246                    return doCheckpoint();
247                }
248            }, "ActiveMQ Journal Checkpoint Worker");
249    
250            checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
251                public Thread newThread(Runnable runable) {
252                    Thread t = new Thread(runable, "Journal checkpoint worker");
253                    t.setPriority(7);
254                    return t;
255                }
256            });
257            // checkpointExecutor.allowCoreThreadTimeOut(true);
258    
259            this.usageManager.getMemoryUsage().addUsageListener(this);
260    
261            if (longTermPersistence instanceof JDBCPersistenceAdapter) {
262                // Disabled periodic clean up as it deadlocks with the checkpoint
263                // operations.
264                ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
265            }
266    
267            longTermPersistence.start();
268            createTransactionStore();
269            recover();
270    
271            // Do a checkpoint periodically.
272            this.scheduler = new Scheduler("Journal Scheduler");
273            this.scheduler.start();
274            this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
275    
276        }
277    
278        public void stop() throws Exception {
279    
280            this.usageManager.getMemoryUsage().removeUsageListener(this);
281            if (!started.compareAndSet(true, false)) {
282                return;
283            }
284    
285            this.scheduler.cancel(periodicCheckpointTask);
286            this.scheduler.stop();
287    
288            // Take one final checkpoint and stop checkpoint processing.
289            checkpoint(true, true);
290            checkpointTask.shutdown();
291            ThreadPoolUtils.shutdown(checkpointExecutor);
292            checkpointExecutor = null;
293    
294            queues.clear();
295            topics.clear();
296    
297            IOException firstException = null;
298            try {
299                journal.close();
300            } catch (Exception e) {
301                firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
302            }
303            longTermPersistence.stop();
304    
305            if (firstException != null) {
306                throw firstException;
307            }
308        }
309    
310        // Properties
311        // -------------------------------------------------------------------------
312        public PersistenceAdapter getLongTermPersistence() {
313            return longTermPersistence;
314        }
315    
316        /**
317         * @return Returns the wireFormat.
318         */
319        public WireFormat getWireFormat() {
320            return wireFormat;
321        }
322    
323        // Implementation methods
324        // -------------------------------------------------------------------------
325    
326        /**
327         * The Journal give us a call back so that we can move old data out of the
328         * journal. Taking a checkpoint does this for us.
329         * 
330         * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
331         */
332        public void overflowNotification(RecordLocation safeLocation) {
333            checkpoint(false, true);
334        }
335    
336        /**
337         * When we checkpoint we move all the journalled data to long term storage.
338         * 
339         */
340        public void checkpoint(boolean sync, boolean fullCheckpoint) {
341            try {
342                if (journal == null) {
343                    throw new IllegalStateException("Journal is closed.");
344                }
345    
346                long now = System.currentTimeMillis();
347                CountDownLatch latch = null;
348                synchronized (this) {
349                    latch = nextCheckpointCountDownLatch;
350                    lastCheckpointRequest = now;
351                    if (fullCheckpoint) {
352                        this.fullCheckPoint = true;
353                    }
354                }
355    
356                checkpointTask.wakeup();
357    
358                if (sync) {
359                    LOG.debug("Waking for checkpoint to complete.");
360                    latch.await();
361                }
362            } catch (InterruptedException e) {
363                Thread.currentThread().interrupt();
364                LOG.warn("Request to start checkpoint failed: " + e, e);
365            }
366        }
367    
368        public void checkpoint(boolean sync) {
369            checkpoint(sync, sync);
370        }
371    
372        /**
373         * This does the actual checkpoint.
374         * 
375         * @return
376         */
377        public boolean doCheckpoint() {
378            CountDownLatch latch = null;
379            boolean fullCheckpoint;
380            synchronized (this) {
381                latch = nextCheckpointCountDownLatch;
382                nextCheckpointCountDownLatch = new CountDownLatch(1);
383                fullCheckpoint = this.fullCheckPoint;
384                this.fullCheckPoint = false;
385            }
386            try {
387    
388                LOG.debug("Checkpoint started.");
389                RecordLocation newMark = null;
390    
391                ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size());
392    
393                //
394                // We do many partial checkpoints (fullCheckpoint==false) to move
395                // topic messages
396                // to long term store as soon as possible.
397                // 
398                // We want to avoid doing that for queue messages since removes the
399                // come in the same
400                // checkpoint cycle will nullify the previous message add.
401                // Therefore, we only
402                // checkpoint queues on the fullCheckpoint cycles.
403                //
404                if (fullCheckpoint) {
405                    Iterator<JournalMessageStore> iterator = queues.values().iterator();
406                    while (iterator.hasNext()) {
407                        try {
408                            final JournalMessageStore ms = iterator.next();
409                            FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
410                                public RecordLocation call() throws Exception {
411                                    return ms.checkpoint();
412                                }
413                            });
414                            futureTasks.add(task);
415                            checkpointExecutor.execute(task);
416                        } catch (Exception e) {
417                            LOG.error("Failed to checkpoint a message store: " + e, e);
418                        }
419                    }
420                }
421    
422                Iterator<JournalTopicMessageStore> iterator = topics.values().iterator();
423                while (iterator.hasNext()) {
424                    try {
425                        final JournalTopicMessageStore ms = iterator.next();
426                        FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
427                            public RecordLocation call() throws Exception {
428                                return ms.checkpoint();
429                            }
430                        });
431                        futureTasks.add(task);
432                        checkpointExecutor.execute(task);
433                    } catch (Exception e) {
434                        LOG.error("Failed to checkpoint a message store: " + e, e);
435                    }
436                }
437    
438                try {
439                    for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) {
440                        FutureTask<RecordLocation> ft = iter.next();
441                        RecordLocation mark = ft.get();
442                        // We only set a newMark on full checkpoints.
443                        if (fullCheckpoint) {
444                            if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
445                                newMark = mark;
446                            }
447                        }
448                    }
449                } catch (Throwable e) {
450                    LOG.error("Failed to checkpoint a message store: " + e, e);
451                }
452    
453                if (fullCheckpoint) {
454                    try {
455                        if (newMark != null) {
456                            LOG.debug("Marking journal at: " + newMark);
457                            journal.setMark(newMark, true);
458                        }
459                    } catch (Exception e) {
460                        LOG.error("Failed to mark the Journal: " + e, e);
461                    }
462    
463                    if (longTermPersistence instanceof JDBCPersistenceAdapter) {
464                        // We may be check pointing more often than the
465                        // checkpointInterval if under high use
466                        // But we don't want to clean up the db that often.
467                        long now = System.currentTimeMillis();
468                        if (now > lastCleanup + checkpointInterval) {
469                            lastCleanup = now;
470                            ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
471                        }
472                    }
473                }
474    
475                LOG.debug("Checkpoint done.");
476            } finally {
477                latch.countDown();
478            }
479            synchronized (this) {
480                return this.fullCheckPoint;
481            }
482    
483        }
484    
485        /**
486         * @param location
487         * @return
488         * @throws IOException
489         */
490        public DataStructure readCommand(RecordLocation location) throws IOException {
491            try {
492                Packet packet = journal.read(location);
493                return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
494            } catch (InvalidRecordLocationException e) {
495                throw createReadException(location, e);
496            } catch (IOException e) {
497                throw createReadException(location, e);
498            }
499        }
500    
501        /**
502         * Move all the messages that were in the journal into long term storage. We
503         * just replay and do a checkpoint.
504         * 
505         * @throws IOException
506         * @throws IOException
507         * @throws InvalidRecordLocationException
508         * @throws IllegalStateException
509         */
510        private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
511    
512            RecordLocation pos = null;
513            int transactionCounter = 0;
514    
515            LOG.info("Journal Recovery Started from: " + journal);
516            ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
517    
518            // While we have records in the journal.
519            while ((pos = journal.getNextRecordLocation(pos)) != null) {
520                Packet data = journal.read(pos);
521                DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
522    
523                if (c instanceof Message) {
524                    Message message = (Message)c;
525                    JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
526                    if (message.isInTransaction()) {
527                        transactionStore.addMessage(store, message, pos);
528                    } else {
529                        store.replayAddMessage(context, message);
530                        transactionCounter++;
531                    }
532                } else {
533                    switch (c.getDataStructureType()) {
534                    case JournalQueueAck.DATA_STRUCTURE_TYPE: {
535                        JournalQueueAck command = (JournalQueueAck)c;
536                        JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
537                        if (command.getMessageAck().isInTransaction()) {
538                            transactionStore.removeMessage(store, command.getMessageAck(), pos);
539                        } else {
540                            store.replayRemoveMessage(context, command.getMessageAck());
541                            transactionCounter++;
542                        }
543                    }
544                        break;
545                    case JournalTopicAck.DATA_STRUCTURE_TYPE: {
546                        JournalTopicAck command = (JournalTopicAck)c;
547                        JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
548                        if (command.getTransactionId() != null) {
549                            transactionStore.acknowledge(store, command, pos);
550                        } else {
551                            store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
552                            transactionCounter++;
553                        }
554                    }
555                        break;
556                    case JournalTransaction.DATA_STRUCTURE_TYPE: {
557                        JournalTransaction command = (JournalTransaction)c;
558                        try {
559                            // Try to replay the packet.
560                            switch (command.getType()) {
561                            case JournalTransaction.XA_PREPARE:
562                                transactionStore.replayPrepare(command.getTransactionId());
563                                break;
564                            case JournalTransaction.XA_COMMIT:
565                            case JournalTransaction.LOCAL_COMMIT:
566                                Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
567                                if (tx == null) {
568                                    break; // We may be trying to replay a commit
569                                }
570                                // that
571                                // was already committed.
572    
573                                // Replay the committed operations.
574                                tx.getOperations();
575                                for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
576                                    TxOperation op = (TxOperation)iter.next();
577                                    if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
578                                        op.store.replayAddMessage(context, (Message)op.data);
579                                    }
580                                    if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
581                                        op.store.replayRemoveMessage(context, (MessageAck)op.data);
582                                    }
583                                    if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
584                                        JournalTopicAck ack = (JournalTopicAck)op.data;
585                                        ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
586                                    }
587                                }
588                                transactionCounter++;
589                                break;
590                            case JournalTransaction.LOCAL_ROLLBACK:
591                            case JournalTransaction.XA_ROLLBACK:
592                                transactionStore.replayRollback(command.getTransactionId());
593                                break;
594                            default:
595                                throw new IOException("Invalid journal command type: " + command.getType());
596                            }
597                        } catch (IOException e) {
598                            LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
599                        }
600                    }
601                        break;
602                    case JournalTrace.DATA_STRUCTURE_TYPE:
603                        JournalTrace trace = (JournalTrace)c;
604                        LOG.debug("TRACE Entry: " + trace.getMessage());
605                        break;
606                    default:
607                        LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
608                    }
609                }
610            }
611    
612            RecordLocation location = writeTraceMessage("RECOVERED", true);
613            journal.setMark(location, true);
614    
615            LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
616        }
617    
618        private IOException createReadException(RecordLocation location, Exception e) {
619            return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
620        }
621    
622        protected IOException createWriteException(DataStructure packet, Exception e) {
623            return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
624        }
625    
626        protected IOException createWriteException(String command, Exception e) {
627            return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
628        }
629    
630        protected IOException createRecoveryFailedException(Exception e) {
631            return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
632        }
633    
634        /**
635         * @param command
636         * @param sync
637         * @return
638         * @throws IOException
639         */
640        public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
641            if (started.get()) {
642                try {
643                        return journal.write(toPacket(wireFormat.marshal(command)), sync);
644                } catch (IOException ioe) {
645                        LOG.error("Cannot write to the journal", ioe);
646                        brokerService.handleIOException(ioe);
647                        throw ioe;
648                }
649            }
650            throw new IOException("closed");
651        }
652    
653        private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
654            JournalTrace trace = new JournalTrace();
655            trace.setMessage(message);
656            return writeCommand(trace, sync);
657        }
658    
659        public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
660            newPercentUsage = (newPercentUsage / 10) * 10;
661            oldPercentUsage = (oldPercentUsage / 10) * 10;
662            if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
663                boolean sync = newPercentUsage >= 90;
664                checkpoint(sync, true);
665            }
666        }
667    
668        public JournalTransactionStore getTransactionStore() {
669            return transactionStore;
670        }
671    
672        public void deleteAllMessages() throws IOException {
673            try {
674                JournalTrace trace = new JournalTrace();
675                trace.setMessage("DELETED");
676                RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
677                journal.setMark(location, true);
678                LOG.info("Journal deleted: ");
679            } catch (IOException e) {
680                throw e;
681            } catch (Throwable e) {
682                throw IOExceptionSupport.create(e);
683            }
684            longTermPersistence.deleteAllMessages();
685        }
686    
687        public SystemUsage getUsageManager() {
688            return usageManager;
689        }
690    
691        public int getMaxCheckpointMessageAddSize() {
692            return maxCheckpointMessageAddSize;
693        }
694    
695        public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
696            this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
697        }
698    
699        public int getMaxCheckpointWorkers() {
700            return maxCheckpointWorkers;
701        }
702    
703        public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
704            this.maxCheckpointWorkers = maxCheckpointWorkers;
705        }
706    
707        public boolean isUseExternalMessageReferences() {
708            return false;
709        }
710    
711        public void setUseExternalMessageReferences(boolean enable) {
712            if (enable) {
713                throw new IllegalArgumentException("The journal does not support message references.");
714            }
715        }
716    
717        public Packet toPacket(ByteSequence sequence) {
718            return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
719        }
720    
721        public ByteSequence toByteSequence(Packet packet) {
722            org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
723            return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
724        }
725    
726        public void setBrokerName(String brokerName) {
727            longTermPersistence.setBrokerName(brokerName);
728        }
729    
730        @Override
731        public String toString() {
732            return "JournalPersistenceAdapator(" + longTermPersistence + ")";
733        }
734    
735        public void setDirectory(File dir) {
736            this.directory=dir;
737        }
738        
739        public File getDirectory(){
740            return directory;
741        }
742        
743        public long size(){
744            return 0;
745        }
746    
747        public void setBrokerService(BrokerService brokerService) {
748            this.brokerService = brokerService;
749            PersistenceAdapter pa = getLongTermPersistence();
750            if( pa instanceof BrokerServiceAware ) {
751                ((BrokerServiceAware)pa).setBrokerService(brokerService);
752            }
753        }
754    
755        public long getLastProducerSequenceId(ProducerId id) {
756            return -1;
757        }
758    
759    }