001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.journal;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.util.ArrayList;
022 import java.util.HashSet;
023 import java.util.Iterator;
024 import java.util.Set;
025 import java.util.concurrent.Callable;
026 import java.util.concurrent.ConcurrentHashMap;
027 import java.util.concurrent.CountDownLatch;
028 import java.util.concurrent.FutureTask;
029 import java.util.concurrent.LinkedBlockingQueue;
030 import java.util.concurrent.ThreadFactory;
031 import java.util.concurrent.ThreadPoolExecutor;
032 import java.util.concurrent.TimeUnit;
033 import java.util.concurrent.atomic.AtomicBoolean;
034 import org.apache.activeio.journal.InvalidRecordLocationException;
035 import org.apache.activeio.journal.Journal;
036 import org.apache.activeio.journal.JournalEventListener;
037 import org.apache.activeio.journal.RecordLocation;
038 import org.apache.activeio.packet.ByteArrayPacket;
039 import org.apache.activeio.packet.Packet;
040 import org.apache.activemq.broker.BrokerService;
041 import org.apache.activemq.broker.BrokerServiceAware;
042 import org.apache.activemq.broker.ConnectionContext;
043 import org.apache.activemq.command.ActiveMQDestination;
044 import org.apache.activemq.command.ActiveMQQueue;
045 import org.apache.activemq.command.ActiveMQTopic;
046 import org.apache.activemq.command.DataStructure;
047 import org.apache.activemq.command.JournalQueueAck;
048 import org.apache.activemq.command.JournalTopicAck;
049 import org.apache.activemq.command.JournalTrace;
050 import org.apache.activemq.command.JournalTransaction;
051 import org.apache.activemq.command.Message;
052 import org.apache.activemq.command.MessageAck;
053 import org.apache.activemq.command.ProducerId;
054 import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
055 import org.apache.activemq.openwire.OpenWireFormat;
056 import org.apache.activemq.store.MessageStore;
057 import org.apache.activemq.store.PersistenceAdapter;
058 import org.apache.activemq.store.TopicMessageStore;
059 import org.apache.activemq.store.TransactionStore;
060 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
061 import org.apache.activemq.store.journal.JournalTransactionStore.Tx;
062 import org.apache.activemq.store.journal.JournalTransactionStore.TxOperation;
063 import org.apache.activemq.thread.Scheduler;
064 import org.apache.activemq.thread.Task;
065 import org.apache.activemq.thread.TaskRunner;
066 import org.apache.activemq.thread.TaskRunnerFactory;
067 import org.apache.activemq.usage.SystemUsage;
068 import org.apache.activemq.usage.Usage;
069 import org.apache.activemq.usage.UsageListener;
070 import org.apache.activemq.util.ByteSequence;
071 import org.apache.activemq.util.IOExceptionSupport;
072 import org.apache.activemq.util.ThreadPoolUtils;
073 import org.apache.activemq.wireformat.WireFormat;
074 import org.slf4j.Logger;
075 import org.slf4j.LoggerFactory;
076
077 /**
078 * An implementation of {@link PersistenceAdapter} designed for use with a
079 * {@link Journal} and then check pointing asynchronously on a timeout with some
080 * other long term persistent storage.
081 *
082 * @org.apache.xbean.XBean
083 *
084 */
085 public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
086
087 private BrokerService brokerService;
088
089 protected Scheduler scheduler;
090 private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
091
092 private Journal journal;
093 private PersistenceAdapter longTermPersistence;
094
095 private final WireFormat wireFormat = new OpenWireFormat();
096
097 private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
098 private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
099
100 private SystemUsage usageManager;
101 private final long checkpointInterval = 1000 * 60 * 5;
102 private long lastCheckpointRequest = System.currentTimeMillis();
103 private long lastCleanup = System.currentTimeMillis();
104 private int maxCheckpointWorkers = 10;
105 private int maxCheckpointMessageAddSize = 1024 * 1024;
106
107 private final JournalTransactionStore transactionStore = new JournalTransactionStore(this);
108 private ThreadPoolExecutor checkpointExecutor;
109
110 private TaskRunner checkpointTask;
111 private CountDownLatch nextCheckpointCountDownLatch = new CountDownLatch(1);
112 private boolean fullCheckPoint;
113
114 private final AtomicBoolean started = new AtomicBoolean(false);
115
116 private final Runnable periodicCheckpointTask = createPeriodicCheckpointTask();
117
118 private TaskRunnerFactory taskRunnerFactory;
119 private File directory;
120
121 public JournalPersistenceAdapter() {
122 }
123
124 public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
125 setJournal(journal);
126 setTaskRunnerFactory(taskRunnerFactory);
127 setPersistenceAdapter(longTermPersistence);
128 }
129
130 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
131 this.taskRunnerFactory = taskRunnerFactory;
132 }
133
134 public void setJournal(Journal journal) {
135 this.journal = journal;
136 journal.setJournalEventListener(this);
137 }
138
139 public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
140 this.longTermPersistence = longTermPersistence;
141 }
142
143 final Runnable createPeriodicCheckpointTask() {
144 return new Runnable() {
145 public void run() {
146 long lastTime = 0;
147 synchronized (this) {
148 lastTime = lastCheckpointRequest;
149 }
150 if (System.currentTimeMillis() > lastTime + checkpointInterval) {
151 checkpoint(false, true);
152 }
153 }
154 };
155 }
156
157 /**
158 * @param usageManager The UsageManager that is controlling the
159 * destination's memory usage.
160 */
161 public void setUsageManager(SystemUsage usageManager) {
162 this.usageManager = usageManager;
163 longTermPersistence.setUsageManager(usageManager);
164 }
165
166 public Set<ActiveMQDestination> getDestinations() {
167 Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
168 destinations.addAll(queues.keySet());
169 destinations.addAll(topics.keySet());
170 return destinations;
171 }
172
173 private MessageStore createMessageStore(ActiveMQDestination destination) throws IOException {
174 if (destination.isQueue()) {
175 return createQueueMessageStore((ActiveMQQueue)destination);
176 } else {
177 return createTopicMessageStore((ActiveMQTopic)destination);
178 }
179 }
180
181 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
182 JournalMessageStore store = queues.get(destination);
183 if (store == null) {
184 MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destination);
185 store = new JournalMessageStore(this, checkpointStore, destination);
186 queues.put(destination, store);
187 }
188 return store;
189 }
190
191 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
192 JournalTopicMessageStore store = topics.get(destinationName);
193 if (store == null) {
194 TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
195 store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
196 topics.put(destinationName, store);
197 }
198 return store;
199 }
200
201 /**
202 * Cleanup method to remove any state associated with the given destination
203 *
204 * @param destination Destination to forget
205 */
206 public void removeQueueMessageStore(ActiveMQQueue destination) {
207 queues.remove(destination);
208 }
209
210 /**
211 * Cleanup method to remove any state associated with the given destination
212 *
213 * @param destination Destination to forget
214 */
215 public void removeTopicMessageStore(ActiveMQTopic destination) {
216 topics.remove(destination);
217 }
218
219 public TransactionStore createTransactionStore() throws IOException {
220 return transactionStore;
221 }
222
223 public long getLastMessageBrokerSequenceId() throws IOException {
224 return longTermPersistence.getLastMessageBrokerSequenceId();
225 }
226
227 public void beginTransaction(ConnectionContext context) throws IOException {
228 longTermPersistence.beginTransaction(context);
229 }
230
231 public void commitTransaction(ConnectionContext context) throws IOException {
232 longTermPersistence.commitTransaction(context);
233 }
234
235 public void rollbackTransaction(ConnectionContext context) throws IOException {
236 longTermPersistence.rollbackTransaction(context);
237 }
238
239 public synchronized void start() throws Exception {
240 if (!started.compareAndSet(false, true)) {
241 return;
242 }
243
244 checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
245 public boolean iterate() {
246 return doCheckpoint();
247 }
248 }, "ActiveMQ Journal Checkpoint Worker");
249
250 checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
251 public Thread newThread(Runnable runable) {
252 Thread t = new Thread(runable, "Journal checkpoint worker");
253 t.setPriority(7);
254 return t;
255 }
256 });
257 // checkpointExecutor.allowCoreThreadTimeOut(true);
258
259 this.usageManager.getMemoryUsage().addUsageListener(this);
260
261 if (longTermPersistence instanceof JDBCPersistenceAdapter) {
262 // Disabled periodic clean up as it deadlocks with the checkpoint
263 // operations.
264 ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
265 }
266
267 longTermPersistence.start();
268 createTransactionStore();
269 recover();
270
271 // Do a checkpoint periodically.
272 this.scheduler = new Scheduler("Journal Scheduler");
273 this.scheduler.start();
274 this.scheduler.executePeriodically(periodicCheckpointTask, checkpointInterval / 10);
275
276 }
277
278 public void stop() throws Exception {
279
280 this.usageManager.getMemoryUsage().removeUsageListener(this);
281 if (!started.compareAndSet(true, false)) {
282 return;
283 }
284
285 this.scheduler.cancel(periodicCheckpointTask);
286 this.scheduler.stop();
287
288 // Take one final checkpoint and stop checkpoint processing.
289 checkpoint(true, true);
290 checkpointTask.shutdown();
291 ThreadPoolUtils.shutdown(checkpointExecutor);
292 checkpointExecutor = null;
293
294 queues.clear();
295 topics.clear();
296
297 IOException firstException = null;
298 try {
299 journal.close();
300 } catch (Exception e) {
301 firstException = IOExceptionSupport.create("Failed to close journals: " + e, e);
302 }
303 longTermPersistence.stop();
304
305 if (firstException != null) {
306 throw firstException;
307 }
308 }
309
310 // Properties
311 // -------------------------------------------------------------------------
312 public PersistenceAdapter getLongTermPersistence() {
313 return longTermPersistence;
314 }
315
316 /**
317 * @return Returns the wireFormat.
318 */
319 public WireFormat getWireFormat() {
320 return wireFormat;
321 }
322
323 // Implementation methods
324 // -------------------------------------------------------------------------
325
326 /**
327 * The Journal give us a call back so that we can move old data out of the
328 * journal. Taking a checkpoint does this for us.
329 *
330 * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
331 */
332 public void overflowNotification(RecordLocation safeLocation) {
333 checkpoint(false, true);
334 }
335
336 /**
337 * When we checkpoint we move all the journalled data to long term storage.
338 *
339 */
340 public void checkpoint(boolean sync, boolean fullCheckpoint) {
341 try {
342 if (journal == null) {
343 throw new IllegalStateException("Journal is closed.");
344 }
345
346 long now = System.currentTimeMillis();
347 CountDownLatch latch = null;
348 synchronized (this) {
349 latch = nextCheckpointCountDownLatch;
350 lastCheckpointRequest = now;
351 if (fullCheckpoint) {
352 this.fullCheckPoint = true;
353 }
354 }
355
356 checkpointTask.wakeup();
357
358 if (sync) {
359 LOG.debug("Waking for checkpoint to complete.");
360 latch.await();
361 }
362 } catch (InterruptedException e) {
363 Thread.currentThread().interrupt();
364 LOG.warn("Request to start checkpoint failed: " + e, e);
365 }
366 }
367
368 public void checkpoint(boolean sync) {
369 checkpoint(sync, sync);
370 }
371
372 /**
373 * This does the actual checkpoint.
374 *
375 * @return
376 */
377 public boolean doCheckpoint() {
378 CountDownLatch latch = null;
379 boolean fullCheckpoint;
380 synchronized (this) {
381 latch = nextCheckpointCountDownLatch;
382 nextCheckpointCountDownLatch = new CountDownLatch(1);
383 fullCheckpoint = this.fullCheckPoint;
384 this.fullCheckPoint = false;
385 }
386 try {
387
388 LOG.debug("Checkpoint started.");
389 RecordLocation newMark = null;
390
391 ArrayList<FutureTask<RecordLocation>> futureTasks = new ArrayList<FutureTask<RecordLocation>>(queues.size() + topics.size());
392
393 //
394 // We do many partial checkpoints (fullCheckpoint==false) to move
395 // topic messages
396 // to long term store as soon as possible.
397 //
398 // We want to avoid doing that for queue messages since removes the
399 // come in the same
400 // checkpoint cycle will nullify the previous message add.
401 // Therefore, we only
402 // checkpoint queues on the fullCheckpoint cycles.
403 //
404 if (fullCheckpoint) {
405 Iterator<JournalMessageStore> iterator = queues.values().iterator();
406 while (iterator.hasNext()) {
407 try {
408 final JournalMessageStore ms = iterator.next();
409 FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
410 public RecordLocation call() throws Exception {
411 return ms.checkpoint();
412 }
413 });
414 futureTasks.add(task);
415 checkpointExecutor.execute(task);
416 } catch (Exception e) {
417 LOG.error("Failed to checkpoint a message store: " + e, e);
418 }
419 }
420 }
421
422 Iterator<JournalTopicMessageStore> iterator = topics.values().iterator();
423 while (iterator.hasNext()) {
424 try {
425 final JournalTopicMessageStore ms = iterator.next();
426 FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
427 public RecordLocation call() throws Exception {
428 return ms.checkpoint();
429 }
430 });
431 futureTasks.add(task);
432 checkpointExecutor.execute(task);
433 } catch (Exception e) {
434 LOG.error("Failed to checkpoint a message store: " + e, e);
435 }
436 }
437
438 try {
439 for (Iterator<FutureTask<RecordLocation>> iter = futureTasks.iterator(); iter.hasNext();) {
440 FutureTask<RecordLocation> ft = iter.next();
441 RecordLocation mark = ft.get();
442 // We only set a newMark on full checkpoints.
443 if (fullCheckpoint) {
444 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
445 newMark = mark;
446 }
447 }
448 }
449 } catch (Throwable e) {
450 LOG.error("Failed to checkpoint a message store: " + e, e);
451 }
452
453 if (fullCheckpoint) {
454 try {
455 if (newMark != null) {
456 LOG.debug("Marking journal at: " + newMark);
457 journal.setMark(newMark, true);
458 }
459 } catch (Exception e) {
460 LOG.error("Failed to mark the Journal: " + e, e);
461 }
462
463 if (longTermPersistence instanceof JDBCPersistenceAdapter) {
464 // We may be check pointing more often than the
465 // checkpointInterval if under high use
466 // But we don't want to clean up the db that often.
467 long now = System.currentTimeMillis();
468 if (now > lastCleanup + checkpointInterval) {
469 lastCleanup = now;
470 ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
471 }
472 }
473 }
474
475 LOG.debug("Checkpoint done.");
476 } finally {
477 latch.countDown();
478 }
479 synchronized (this) {
480 return this.fullCheckPoint;
481 }
482
483 }
484
485 /**
486 * @param location
487 * @return
488 * @throws IOException
489 */
490 public DataStructure readCommand(RecordLocation location) throws IOException {
491 try {
492 Packet packet = journal.read(location);
493 return (DataStructure)wireFormat.unmarshal(toByteSequence(packet));
494 } catch (InvalidRecordLocationException e) {
495 throw createReadException(location, e);
496 } catch (IOException e) {
497 throw createReadException(location, e);
498 }
499 }
500
501 /**
502 * Move all the messages that were in the journal into long term storage. We
503 * just replay and do a checkpoint.
504 *
505 * @throws IOException
506 * @throws IOException
507 * @throws InvalidRecordLocationException
508 * @throws IllegalStateException
509 */
510 private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, IOException {
511
512 RecordLocation pos = null;
513 int transactionCounter = 0;
514
515 LOG.info("Journal Recovery Started from: " + journal);
516 ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext());
517
518 // While we have records in the journal.
519 while ((pos = journal.getNextRecordLocation(pos)) != null) {
520 Packet data = journal.read(pos);
521 DataStructure c = (DataStructure)wireFormat.unmarshal(toByteSequence(data));
522
523 if (c instanceof Message) {
524 Message message = (Message)c;
525 JournalMessageStore store = (JournalMessageStore)createMessageStore(message.getDestination());
526 if (message.isInTransaction()) {
527 transactionStore.addMessage(store, message, pos);
528 } else {
529 store.replayAddMessage(context, message);
530 transactionCounter++;
531 }
532 } else {
533 switch (c.getDataStructureType()) {
534 case JournalQueueAck.DATA_STRUCTURE_TYPE: {
535 JournalQueueAck command = (JournalQueueAck)c;
536 JournalMessageStore store = (JournalMessageStore)createMessageStore(command.getDestination());
537 if (command.getMessageAck().isInTransaction()) {
538 transactionStore.removeMessage(store, command.getMessageAck(), pos);
539 } else {
540 store.replayRemoveMessage(context, command.getMessageAck());
541 transactionCounter++;
542 }
543 }
544 break;
545 case JournalTopicAck.DATA_STRUCTURE_TYPE: {
546 JournalTopicAck command = (JournalTopicAck)c;
547 JournalTopicMessageStore store = (JournalTopicMessageStore)createMessageStore(command.getDestination());
548 if (command.getTransactionId() != null) {
549 transactionStore.acknowledge(store, command, pos);
550 } else {
551 store.replayAcknowledge(context, command.getClientId(), command.getSubscritionName(), command.getMessageId());
552 transactionCounter++;
553 }
554 }
555 break;
556 case JournalTransaction.DATA_STRUCTURE_TYPE: {
557 JournalTransaction command = (JournalTransaction)c;
558 try {
559 // Try to replay the packet.
560 switch (command.getType()) {
561 case JournalTransaction.XA_PREPARE:
562 transactionStore.replayPrepare(command.getTransactionId());
563 break;
564 case JournalTransaction.XA_COMMIT:
565 case JournalTransaction.LOCAL_COMMIT:
566 Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
567 if (tx == null) {
568 break; // We may be trying to replay a commit
569 }
570 // that
571 // was already committed.
572
573 // Replay the committed operations.
574 tx.getOperations();
575 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
576 TxOperation op = (TxOperation)iter.next();
577 if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
578 op.store.replayAddMessage(context, (Message)op.data);
579 }
580 if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
581 op.store.replayRemoveMessage(context, (MessageAck)op.data);
582 }
583 if (op.operationType == TxOperation.ACK_OPERATION_TYPE) {
584 JournalTopicAck ack = (JournalTopicAck)op.data;
585 ((JournalTopicMessageStore)op.store).replayAcknowledge(context, ack.getClientId(), ack.getSubscritionName(), ack.getMessageId());
586 }
587 }
588 transactionCounter++;
589 break;
590 case JournalTransaction.LOCAL_ROLLBACK:
591 case JournalTransaction.XA_ROLLBACK:
592 transactionStore.replayRollback(command.getTransactionId());
593 break;
594 default:
595 throw new IOException("Invalid journal command type: " + command.getType());
596 }
597 } catch (IOException e) {
598 LOG.error("Recovery Failure: Could not replay: " + c + ", reason: " + e, e);
599 }
600 }
601 break;
602 case JournalTrace.DATA_STRUCTURE_TYPE:
603 JournalTrace trace = (JournalTrace)c;
604 LOG.debug("TRACE Entry: " + trace.getMessage());
605 break;
606 default:
607 LOG.error("Unknown type of record in transaction log which will be discarded: " + c);
608 }
609 }
610 }
611
612 RecordLocation location = writeTraceMessage("RECOVERED", true);
613 journal.setMark(location, true);
614
615 LOG.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
616 }
617
618 private IOException createReadException(RecordLocation location, Exception e) {
619 return IOExceptionSupport.create("Failed to read to journal for: " + location + ". Reason: " + e, e);
620 }
621
622 protected IOException createWriteException(DataStructure packet, Exception e) {
623 return IOExceptionSupport.create("Failed to write to journal for: " + packet + ". Reason: " + e, e);
624 }
625
626 protected IOException createWriteException(String command, Exception e) {
627 return IOExceptionSupport.create("Failed to write to journal for command: " + command + ". Reason: " + e, e);
628 }
629
630 protected IOException createRecoveryFailedException(Exception e) {
631 return IOExceptionSupport.create("Failed to recover from journal. Reason: " + e, e);
632 }
633
634 /**
635 * @param command
636 * @param sync
637 * @return
638 * @throws IOException
639 */
640 public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
641 if (started.get()) {
642 try {
643 return journal.write(toPacket(wireFormat.marshal(command)), sync);
644 } catch (IOException ioe) {
645 LOG.error("Cannot write to the journal", ioe);
646 brokerService.handleIOException(ioe);
647 throw ioe;
648 }
649 }
650 throw new IOException("closed");
651 }
652
653 private RecordLocation writeTraceMessage(String message, boolean sync) throws IOException {
654 JournalTrace trace = new JournalTrace();
655 trace.setMessage(message);
656 return writeCommand(trace, sync);
657 }
658
659 public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
660 newPercentUsage = (newPercentUsage / 10) * 10;
661 oldPercentUsage = (oldPercentUsage / 10) * 10;
662 if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
663 boolean sync = newPercentUsage >= 90;
664 checkpoint(sync, true);
665 }
666 }
667
668 public JournalTransactionStore getTransactionStore() {
669 return transactionStore;
670 }
671
672 public void deleteAllMessages() throws IOException {
673 try {
674 JournalTrace trace = new JournalTrace();
675 trace.setMessage("DELETED");
676 RecordLocation location = journal.write(toPacket(wireFormat.marshal(trace)), false);
677 journal.setMark(location, true);
678 LOG.info("Journal deleted: ");
679 } catch (IOException e) {
680 throw e;
681 } catch (Throwable e) {
682 throw IOExceptionSupport.create(e);
683 }
684 longTermPersistence.deleteAllMessages();
685 }
686
687 public SystemUsage getUsageManager() {
688 return usageManager;
689 }
690
691 public int getMaxCheckpointMessageAddSize() {
692 return maxCheckpointMessageAddSize;
693 }
694
695 public void setMaxCheckpointMessageAddSize(int maxCheckpointMessageAddSize) {
696 this.maxCheckpointMessageAddSize = maxCheckpointMessageAddSize;
697 }
698
699 public int getMaxCheckpointWorkers() {
700 return maxCheckpointWorkers;
701 }
702
703 public void setMaxCheckpointWorkers(int maxCheckpointWorkers) {
704 this.maxCheckpointWorkers = maxCheckpointWorkers;
705 }
706
707 public boolean isUseExternalMessageReferences() {
708 return false;
709 }
710
711 public void setUseExternalMessageReferences(boolean enable) {
712 if (enable) {
713 throw new IllegalArgumentException("The journal does not support message references.");
714 }
715 }
716
717 public Packet toPacket(ByteSequence sequence) {
718 return new ByteArrayPacket(new org.apache.activeio.packet.ByteSequence(sequence.data, sequence.offset, sequence.length));
719 }
720
721 public ByteSequence toByteSequence(Packet packet) {
722 org.apache.activeio.packet.ByteSequence sequence = packet.asByteSequence();
723 return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
724 }
725
726 public void setBrokerName(String brokerName) {
727 longTermPersistence.setBrokerName(brokerName);
728 }
729
730 @Override
731 public String toString() {
732 return "JournalPersistenceAdapator(" + longTermPersistence + ")";
733 }
734
735 public void setDirectory(File dir) {
736 this.directory=dir;
737 }
738
739 public File getDirectory(){
740 return directory;
741 }
742
743 public long size(){
744 return 0;
745 }
746
747 public void setBrokerService(BrokerService brokerService) {
748 this.brokerService = brokerService;
749 PersistenceAdapter pa = getLongTermPersistence();
750 if( pa instanceof BrokerServiceAware ) {
751 ((BrokerServiceAware)pa).setBrokerService(brokerService);
752 }
753 }
754
755 public long getLastProducerSequenceId(ProducerId id) {
756 return -1;
757 }
758
759 }