001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.journal;
018
019import java.io.File;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.Set;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.FutureTask;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.ThreadFactory;
032import java.util.concurrent.ThreadPoolExecutor;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035
036import org.apache.activeio.journal.InvalidRecordLocationException;
037import org.apache.activeio.journal.Journal;
038import org.apache.activeio.journal.JournalEventListener;
039import org.apache.activeio.journal.RecordLocation;
040import org.apache.activeio.packet.ByteArrayPacket;
041import org.apache.activeio.packet.Packet;
042import org.apache.activemq.broker.BrokerService;
043import org.apache.activemq.broker.BrokerServiceAware;
044import org.apache.activemq.broker.ConnectionContext;
045import org.apache.activemq.broker.scheduler.JobSchedulerStore;
046import org.apache.activemq.command.ActiveMQDestination;
047import org.apache.activemq.command.ActiveMQQueue;
048import org.apache.activemq.command.ActiveMQTopic;
049import org.apache.activemq.command.DataStructure;
050import org.apache.activemq.command.JournalQueueAck;
051import org.apache.activemq.command.JournalTopicAck;
052import org.apache.activemq.command.JournalTrace;
053import org.apache.activemq.command.JournalTransaction;
054import org.apache.activemq.command.Message;
055import org.apache.activemq.command.MessageAck;
056import org.apache.activemq.command.ProducerId;
057import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
058import org.apache.activemq.openwire.OpenWireFormat;
059import org.apache.activemq.store.MessageStore;
060import org.apache.activemq.store.PersistenceAdapter;
061import org.apache.activemq.store.TopicMessageStore;
062import org.apache.activemq.store.TransactionStore;
063import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
064import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
065import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation;
066import org.apache.activemq.thread.Scheduler;
067import org.apache.activemq.thread.Task;
068import org.apache.activemq.thread.TaskRunner;
069import org.apache.activemq.thread.TaskRunnerFactory;
070import org.apache.activemq.usage.SystemUsage;
071import org.apache.activemq.usage.Usage;
072import org.apache.activemq.usage.UsageListener;
073import org.apache.activemq.util.ByteSequence;
074import org.apache.activemq.util.IOExceptionSupport;
075import org.apache.activemq.util.ThreadPoolUtils;
076import org.apache.activemq.wireformat.WireFormat;
077import org.slf4j.Logger;
078import org.slf4j.LoggerFactory;
079
080/**
081 * An implementation of {@link PersistenceAdapter} designed for use with a
082 * {@link Journal} and then check pointing asynchronously on a timeout with some
083 * other long term persistent storage.
084 *
085 * @org.apache.xbean.XBean
086 *
087 */
088public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
089
090    private BrokerService brokerService;
091
092    protected Scheduler scheduler;
093    private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
094
095    private Journal journal;
096    private PersistenceAdapter longTermPersistence;
097
098    private final WireFormat wireFormat = new OpenWireFormat();
099
100    private final ConcurrentMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
101    private final ConcurrentMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
102
103    private SystemUsage usageManager;
104    private long checkpointInterval = 1000 * 60 * 5;
105    private long lastCheckpointRequest = System.currentTimeMillis();
106    private long lastCleanup = System.currentTimeMillis();
107    private int maxCheckpointWorkers = 10;
108    private int maxCheckpointMessageAddSize = 1024 * 1024;
109
110    private final JournalTransactionStore transactionStore = new JournalTransactionStore(this);
111    private ThreadPoolExecutor checkpointExecutor;
112
113    private TaskRunner checkpointTask;
114    private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
115    private boolean fullCheckPoint;
116
117    private final AtomicBoolean started = new AtomicBoolean(false);
118
119    private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
120
121    private TaskRunnerFactory taskRunnerFactory;
122    private File directory;
123
124    public JournalPersistenceAdapter() {
125    }
126
127    public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
128        setJournal(journal);
129        setTaskRunnerFactory(taskRunnerFactory);
130        setPersistenceAdapter(longTermPersistence);
131    }
132
133    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
134        this.taskRunnerFactory = taskRunnerFactory;
135    }
136
137    public void setJournal(Journal journal) {
138        this.journal = journal;
139        journal.setJournalEventListener(this);
140    }
141
142    public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
143        this.longTermPersistence = longTermPersistence;
144    }
145
146    final Runnable createPeriodicCheckpointTask() {
147        return new Runnable() {
148            @Override
149            public void run() {
150                long lastTime = 0;
151                synchronized (this) {
152                    lastTime = lastCheckpointRequest;
153                }
154                if (System.currentTimeMillis() > lastTime + checkpointInterval) {
155                    checkpoint(false, true);
156                }
157            }
158        };
159    }
160
161    /**
162     * @param usageManager The UsageManager that is controlling the
163     *                destination's memory usage.
164     */
165    @Override
166    public void setUsageManager(SystemUsage usageManager) {
167        this.usageManager = usageManager;
168        longTermPersistence.setUsageManager(usageManager);
169    }
170
171    @Override
172    public Set<ActiveMQDestination> getDestinations() {
173        Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
174        destinations.addAll(queues.keySet());
175        destinations.addAll(topics.keySet());
176        return destinations;
177    }
178
179    private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
180        if (destination.isQueue()) {
181            return createQueueMessageStore((ActiveMQQueue)destination);
182        } else {
183            return createTopicMessageStore((ActiveMQTopic)destination);
184        }
185    }
186
187    @Override
188    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
189        JournalMessageStore store = queues.get(destination);
190        if (store == null) {
191            MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
192            store = new JournalMessageStore(this, checkpointStore, destination);
193            queues.put(destination, store);
194        }
195        return store;
196    }
197
198    @Override
199    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
200        JournalTopicMessageStore store = topics.get(destinationName);
201        if (store == null) {
202            TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
203            store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
204            topics.put(destinationName, store);
205        }
206        return store;
207    }
208
209    /**
210     * Cleanup method to remove any state associated with the given destination
211     *
212     * @param destination Destination to forget
213     */
214    @Override
215    public void removeQueueMessageStore(ActiveMQQueue destination) {
216        queues.remove(destination);
217    }
218
219    /**
220     * Cleanup method to remove any state associated with the given destination
221     *
222     * @param destination Destination to forget
223     */
224    @Override
225    public void removeTopicMessageStore(ActiveMQTopic destination) {
226        topics.remove(destination);
227    }
228
229    @Override
230    public TransactionStore createTransactionStore() throws IOException {
231        return transactionStore;
232    }
233
234    @Override
235    public long getLastMessageBrokerSequenceId() throws IOException {
236        return longTermPersistence.getLastMessageBrokerSequenceId();
237    }
238
239    @Override
240    public void beginTransaction(ConnectionContext context) throws IOException {
241        longTermPersistence.beginTransaction(context);
242    }
243
244    @Override
245    public void commitTransaction(ConnectionContext context) throws IOException {
246        longTermPersistence.commitTransaction(context);
247    }
248
249    @Override
250    public void rollbackTransaction(ConnectionContext context) throws IOException {
251        longTermPersistence.rollbackTransaction(context);
252    }
253
254    @Override
255    public synchronized void start() throws Exception {
256        if (!started.compareAndSet(false, true)) {
257            return;
258        }
259
260        if( brokerService!=null ) {
261          wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
262        }
263
264        checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
265            @Override
266            public boolean iterate() {
267                return doCheckpoint();
268            }
269        }, "ActiveMQ Journal Checkpoint Worker");
270
271        checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
272            @Override
273            public Thread newThread(Runnable runable) {
274                Thread t = new Thread(runable, "Journal checkpoint worker");
275                t.setPriority(7);
276                return t;
277            }
278        });
279        // checkpointExecutor.allowCoreThreadTimeOut(true);
280
281        this.usageManager.getMemoryUsage().addUsageListener(this);
282
283        if (longTermPersistence instanceof JDBCPersistenceAdapter) {
284            // Disabled periodic clean up as it deadlocks with the checkpoint
285            // operations.
286            ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
287        }
288
289        longTermPersistence.start();
290        createTransactionStore();
291        recover();
292
293        // Do a checkpoint periodically.
294        this.scheduler = new Scheduler("Journal Scheduler");
295        this.scheduler.start();
296        this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
297
298    }
299
300    @Override
301    public void stop() throws Exception {
302
303        this.usageManager.getMemoryUsage().removeUsageListener(this);
304        if (!started.compareAndSet(true, false)) {
305            return;
306        }
307
308        this.scheduler.cancel(periodicCheckpointTask);
309        this.scheduler.stop();
310
311        // Take one final checkpoint and stop checkpoint processing.
312        checkpoint(true, true);
313        checkpointTask.shutdown();
314        ThreadPoolUtils.shutdown(checkpointExecutor);
315        checkpointExecutor = null;
316
317        queues.clear();
318        topics.clear();
319
320        IOException firstException = null;
321        try {
322            journal.close();
323        } catch (Exception e) {
324            firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
325        }
326        longTermPersistence.stop();
327
328        if (firstException != null) {
329            throw firstException;
330        }
331    }
332
333    // Properties
334    // -------------------------------------------------------------------------
335    public PersistenceAdapter getLongTermPersistence() {
336        return longTermPersistence;
337    }
338
339    /**
340     * @return Returns the wireFormat.
341     */
342    public WireFormat getWireFormat() {
343        return wireFormat;
344    }
345
346    // Implementation methods
347    // -------------------------------------------------------------------------
348
349    /**
350     * The Journal give us a call back so that we can move old data out of the
351     * journal. Taking a checkpoint does this for us.
352     *
353     * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
354     */
355    @Override
356    public void overflowNotification(RecordLocation safeLocation) {
357        checkpoint(false, true);
358    }
359
360    /**
361     * When we checkpoint we move all the journalled data to long term storage.
362     *
363     */
364    public void checkpoint(boolean sync, boolean fullCheckpoint) {
365        try {
366            if (journal == null) {
367                throw new IllegalStateException("Journal is closed.");
368            }
369
370            long now = System.currentTimeMillis();
371            CountDownLatch latch = null;
372            synchronized (this) {
373                latch = nextCheckpointCountDownLatch;
374                lastCheckpointRequest = now;
375                if (fullCheckpoint) {
376                    this.fullCheckPoint = true;
377                }
378            }
379
380            checkpointTask.wakeup();
381
382            if (sync) {
383                LOG.debug("Waking for checkpoint to complete.");
384                latch.await();
385            }
386        } catch (InterruptedException e) {
387            Thread.currentThread().interrupt();
388            LOG.warn("Request to start checkpoint failed: " + e, e);
389        }
390    }
391
392    @Override
393    public void checkpoint(boolean sync) {
394        checkpoint(sync, sync);
395    }
396
397    /**
398     * This does the actual checkpoint.
399     *
400     * @return
401     */
402    public boolean doCheckpoint() {
403        CountDownLatch latch = null;
404        boolean fullCheckpoint;
405        synchronized (this) {
406            latch = nextCheckpointCountDownLatch;
407            nextCheckpointCountDownLatch = new CountDownLatch(1);
408            fullCheckpoint = this.fullCheckPoint;
409            this.fullCheckPoint = false;
410        }
411        try {
412
413            LOG.debug("Checkpoint started.");
414            RecordLocation newMark = null;
415
416            ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size());
417
418            //
419            // We do many partial checkpoints (fullCheckpoint==false) to move
420            // topic messages
421            // to long term store as soon as possible.
422            //
423            // We want to avoid doing that for queue messages since removes the
424            // come in the same
425            // checkpoint cycle will nullify the previous message add.
426            // Therefore, we only
427            // checkpoint queues on the fullCheckpoint cycles.
428            //
429            if (fullCheckpoint) {
430                Iterator<JournalMessageStore> iterator = queues.values().iterator();
431                while (iterator.hasNext()) {
432                    try {
433                        final JournalMessageStore ms = iterator.next();
434                        FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
435                            @Override
436                            public RecordLocation call() throws Exception {
437                                return ms.checkpoint();
438                            }
439                        });
440                        futureTasks.add(task);
441                        checkpointExecutor.execute(task);
442                    } catch (Exception e) {
443                        LOG.error("Failed to checkpoint a message store: " + e, e);
444                    }
445                }
446            }
447
448            Iterator<JournalTopicMessageStore> iterator = topics.values().iterator();
449            while (iterator.hasNext()) {
450                try {
451                    final JournalTopicMessageStore ms = iterator.next();
452                    FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
453                        @Override
454                        public RecordLocation call() throws Exception {
455                            return ms.checkpoint();
456                        }
457                    });
458                    futureTasks.add(task);
459                    checkpointExecutor.execute(task);
460                } catch (Exception e) {
461                    LOG.error("Failed to checkpoint a message store: " + e, e);
462                }
463            }
464
465            try {
466                for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) {
467                    FutureTask<RecordLocation> ft = iter.next();
468                    RecordLocation mark = ft.get();
469                    // We only set a newMark on full checkpoints.
470                    if (fullCheckpoint) {
471                        if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
472                            newMark = mark;
473                        }
474                    }
475                }
476            } catch (Throwable e) {
477                LOG.error("Failed to checkpoint a message store: " + e, e);
478            }
479
480            if (fullCheckpoint) {
481                try {
482                    if (newMark != null) {
483                        LOG.debug("Marking journal at: " + newMark);
484                        journal.setMark(newMark, true);
485                    }
486                } catch (Exception e) {
487                    LOG.error("Failed to mark the Journal: " + e, e);
488                }
489
490                if (longTermPersistence instanceof JDBCPersistenceAdapter) {
491                    // We may be check pointing more often than the
492                    // checkpointInterval if under high use
493                    // But we don't want to clean up the db that often.
494                    long now = System.currentTimeMillis();
495                    if (now > lastCleanup + checkpointInterval) {
496                        lastCleanup = now;
497                        ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
498                    }
499                }
500            }
501
502            LOG.debug("Checkpoint done.");
503        } finally {
504            latch.countDown();
505        }
506        synchronized (this) {
507            return this.fullCheckPoint;
508        }
509
510    }
511
512    /**
513     * @param location
514     * @return
515     * @throws IOException
516     */
517    public DataStructure readCommand(RecordLocation location) throws IOException {
518        try {
519            Packet packet = journal.read(location);
520            return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
521        } catch (InvalidRecordLocationException e) {
522            throw createReadException(location, e);
523        } catch (IOException e) {
524            throw createReadException(location, e);
525        }
526    }
527
528    /**
529     * Move all the messages that were in the journal into long term storage. We
530     * just replay and do a checkpoint.
531     *
532     * @throws IOException
533     * @throws IOException
534     * @throws InvalidRecordLocationException
535     * @throws IllegalStateException
536     */
537    private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
538
539        RecordLocation pos = null;
540        int transactionCounter = 0;
541
542        LOG.info("Journal Recovery Started from: " + journal);
543        ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
544
545        // While we have records in the journal.
546        while ((pos = journal.getNextRecordLocation(pos)) != null) {
547            Packet data = journal.read(pos);
548            DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
549
550            if (c instanceof Message) {
551                Message message = (Message)c;
552                JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
553                if (message.isInTransaction()) {
554                    transactionStore.addMessage(store, message, pos);
555                } else {
556                    store.replayAddMessage(context, message);
557                    transactionCounter++;
558                }
559            } else {
560                switch (c.getDataStructureType()) {
561                case JournalQueueAck.DATA_STRUCTURE_TYPE: {
562                    JournalQueueAck command = (JournalQueueAck)c;
563                    JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
564                    if (command.getMessageAck().isInTransaction()) {
565                        transactionStore.removeMessage(store, command.getMessageAck(), pos);
566                    } else {
567                        store.replayRemoveMessage(context, command.getMessageAck());
568                        transactionCounter++;
569                    }
570                }
571                    break;
572                case JournalTopicAck.DATA_STRUCTURE_TYPE: {
573                    JournalTopicAck command = (JournalTopicAck)c;
574                    JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
575                    if (command.getTransactionId() != null) {
576                        transactionStore.acknowledge(store, command, pos);
577                    } else {
578                        store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
579                        transactionCounter++;
580                    }
581                }
582                    break;
583                case JournalTransaction.DATA_STRUCTURE_TYPE: {
584                    JournalTransaction command = (JournalTransaction)c;
585                    try {
586                        // Try to replay the packet.
587                        switch (command.getType()) {
588                        case JournalTransaction.XA_PREPARE:
589                            transactionStore.replayPrepare(command.getTransactionId());
590                            break;
591                        case JournalTransaction.XA_COMMIT:
592                        case JournalTransaction.LOCAL_COMMIT:
593                            Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
594                            if (tx == null) {
595                                break; // We may be trying to replay a commit
596                            }
597                            // that
598                            // was already committed.
599
600                            // Replay the committed operations.
601                            tx.getOperations();
602                            for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
603                                TxOperation op = (TxOperation)iter.next();
604                                if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
605                                    op.store.replayAddMessage(context, (Message)op.data);
606                                }
607                                if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
608                                    op.store.replayRemoveMessage(context, (MessageAck)op.data);
609                                }
610                                if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
611                                    JournalTopicAck ack = (JournalTopicAck)op.data;
612                                    ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
613                                }
614                            }
615                            transactionCounter++;
616                            break;
617                        case JournalTransaction.LOCAL_ROLLBACK:
618                        case JournalTransaction.XA_ROLLBACK:
619                            transactionStore.replayRollback(command.getTransactionId());
620                            break;
621                        default:
622                            throw new IOException("Invalid journal command type: " + command.getType());
623                        }
624                    } catch (IOException e) {
625                        LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
626                    }
627                }
628                    break;
629                case JournalTrace.DATA_STRUCTURE_TYPE:
630                    JournalTrace trace = (JournalTrace)c;
631                    LOG.debug("TRACE Entry: " + trace.getMessage());
632                    break;
633                default:
634                    LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
635                }
636            }
637        }
638
639        RecordLocation location = writeTraceMessage("RECOVERED", true);
640        journal.setMark(location, true);
641
642        LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
643    }
644
645    private IOException createReadException(RecordLocation location, Exception e) {
646        return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
647    }
648
649    protected IOException createWriteException(DataStructure packet, Exception e) {
650        return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
651    }
652
653    protected IOException createWriteException(String command, Exception e) {
654        return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
655    }
656
657    protected IOException createRecoveryFailedException(Exception e) {
658        return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
659    }
660
661    /**
662     * @param command
663     * @param sync
664     * @return
665     * @throws IOException
666     */
667    public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
668        if (started.get()) {
669            try {
670                return journal.write(toPacket(wireFormat.marshal(command)), sync);
671            } catch (IOException ioe) {
672                LOG.error("Cannot write to the journal", ioe);
673                brokerService.handleIOException(ioe);
674                throw ioe;
675            }
676        }
677        throw new IOException("closed");
678    }
679
680    private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
681        JournalTrace trace = new JournalTrace();
682        trace.setMessage(message);
683        return writeCommand(trace, sync);
684    }
685
686    @Override
687    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
688        newPercentUsage = (newPercentUsage / 10) * 10;
689        oldPercentUsage = (oldPercentUsage / 10) * 10;
690        if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
691            boolean sync = newPercentUsage >= 90;
692            checkpoint(sync, true);
693        }
694    }
695
696    public JournalTransactionStore getTransactionStore() {
697        return transactionStore;
698    }
699
700    @Override
701    public void deleteAllMessages() throws IOException {
702        try {
703            JournalTrace trace = new JournalTrace();
704            trace.setMessage("DELETED");
705            RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
706            journal.setMark(location, true);
707            LOG.info("Journal deleted: ");
708        } catch (IOException e) {
709            throw e;
710        } catch (Throwable e) {
711            throw IOExceptionSupport.create(e);
712        }
713        longTermPersistence.deleteAllMessages();
714    }
715
716    public SystemUsage getUsageManager() {
717        return usageManager;
718    }
719
720    public int getMaxCheckpointMessageAddSize() {
721        return maxCheckpointMessageAddSize;
722    }
723
724    public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
725        this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
726    }
727
728    public int getMaxCheckpointWorkers() {
729        return maxCheckpointWorkers;
730    }
731
732    public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
733        this.maxCheckpointWorkers = maxCheckpointWorkers;
734    }
735
736    public long getCheckpointInterval() {
737        return checkpointInterval;
738    }
739
740    public void setCheckpointInterval(long checkpointInterval) {
741        this.checkpointInterval = checkpointInterval;
742    }
743
744    public boolean isUseExternalMessageReferences() {
745        return false;
746    }
747
748    public void setUseExternalMessageReferences(boolean enable) {
749        if (enable) {
750            throw new IllegalArgumentException("The journal does not support message references.");
751        }
752    }
753
754    public Packet toPacket(ByteSequence sequence) {
755        return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
756    }
757
758    public ByteSequence toByteSequence(Packet packet) {
759        org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
760        return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
761    }
762
763    @Override
764    public void setBrokerName(String brokerName) {
765        longTermPersistence.setBrokerName(brokerName);
766    }
767
768    @Override
769    public String toString() {
770        return "JournalPersistenceAdapter(" + longTermPersistence + ")";
771    }
772
773    @Override
774    public void setDirectory(File dir) {
775        this.directory=dir;
776    }
777
778    @Override
779    public File getDirectory(){
780        return directory;
781    }
782
783    @Override
784    public long size(){
785        return 0;
786    }
787
788    @Override
789    public void setBrokerService(BrokerService brokerService) {
790        this.brokerService = brokerService;
791        PersistenceAdapter pa = getLongTermPersistence();
792        if( pa instanceof BrokerServiceAware ) {
793            ((BrokerServiceAware)pa).setBrokerService(brokerService);
794        }
795    }
796
797    @Override
798    public long getLastProducerSequenceId(ProducerId id) {
799        return -1;
800    }
801
802    @Override
803    public void allowIOResumption() {
804        longTermPersistence.allowIOResumption();
805    }
806
807    @Override
808    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
809        return longTermPersistence.createJobSchedulerStore();
810    }
811
812}