001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName;
020
021 import java.io.File;
022 import java.io.IOException;
023 import java.util.Set;
024 import java.util.concurrent.Callable;
025
026 import javax.management.ObjectName;
027
028 import org.apache.activemq.broker.BrokerService;
029 import org.apache.activemq.broker.ConnectionContext;
030 import org.apache.activemq.broker.LockableServiceSupport;
031 import org.apache.activemq.broker.Locker;
032 import org.apache.activemq.broker.jmx.AnnotatedMBean;
033 import org.apache.activemq.broker.jmx.PersistenceAdapterView;
034 import org.apache.activemq.command.ActiveMQDestination;
035 import org.apache.activemq.command.ActiveMQQueue;
036 import org.apache.activemq.command.ActiveMQTopic;
037 import org.apache.activemq.command.LocalTransactionId;
038 import org.apache.activemq.command.ProducerId;
039 import org.apache.activemq.command.TransactionId;
040 import org.apache.activemq.command.XATransactionId;
041 import org.apache.activemq.protobuf.Buffer;
042 import org.apache.activemq.store.JournaledStore;
043 import org.apache.activemq.store.MessageStore;
044 import org.apache.activemq.store.PersistenceAdapter;
045 import org.apache.activemq.store.SharedFileLocker;
046 import org.apache.activemq.store.TopicMessageStore;
047 import org.apache.activemq.store.TransactionStore;
048 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
049 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
050 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
051 import org.apache.activemq.usage.SystemUsage;
052 import org.apache.activemq.util.ServiceStopper;
053
054 /**
055 * An implementation of {@link PersistenceAdapter} designed for use with
056 * KahaDB - Embedded Lightweight Non-Relational Database
057 *
058 * @org.apache.xbean.XBean element="kahaDB"
059 *
060 */
061 public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, JournaledStore {
062 private final KahaDBStore letter = new KahaDBStore();
063
064 /**
065 * @param context
066 * @throws IOException
067 * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
068 */
069 @Override
070 public void beginTransaction(ConnectionContext context) throws IOException {
071 this.letter.beginTransaction(context);
072 }
073
074 /**
075 * @param sync
076 * @throws IOException
077 * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
078 */
079 @Override
080 public void checkpoint(boolean sync) throws IOException {
081 this.letter.checkpoint(sync);
082 }
083
084 /**
085 * @param context
086 * @throws IOException
087 * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
088 */
089 @Override
090 public void commitTransaction(ConnectionContext context) throws IOException {
091 this.letter.commitTransaction(context);
092 }
093
094 /**
095 * @param destination
096 * @return MessageStore
097 * @throws IOException
098 * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
099 */
100 @Override
101 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
102 return this.letter.createQueueMessageStore(destination);
103 }
104
105 /**
106 * @param destination
107 * @return TopicMessageStore
108 * @throws IOException
109 * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
110 */
111 @Override
112 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
113 return this.letter.createTopicMessageStore(destination);
114 }
115
116 /**
117 * @return TransactionStore
118 * @throws IOException
119 * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
120 */
121 @Override
122 public TransactionStore createTransactionStore() throws IOException {
123 return this.letter.createTransactionStore();
124 }
125
126 /**
127 * @throws IOException
128 * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
129 */
130 @Override
131 public void deleteAllMessages() throws IOException {
132 this.letter.deleteAllMessages();
133 }
134
135 /**
136 * @return destinations
137 * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
138 */
139 @Override
140 public Set<ActiveMQDestination> getDestinations() {
141 return this.letter.getDestinations();
142 }
143
144 /**
145 * @return lastMessageBrokerSequenceId
146 * @throws IOException
147 * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
148 */
149 @Override
150 public long getLastMessageBrokerSequenceId() throws IOException {
151 return this.letter.getLastMessageBrokerSequenceId();
152 }
153
154 @Override
155 public long getLastProducerSequenceId(ProducerId id) throws IOException {
156 return this.letter.getLastProducerSequenceId(id);
157 }
158
159 /**
160 * @param destination
161 * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
162 */
163 @Override
164 public void removeQueueMessageStore(ActiveMQQueue destination) {
165 this.letter.removeQueueMessageStore(destination);
166 }
167
168 /**
169 * @param destination
170 * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
171 */
172 @Override
173 public void removeTopicMessageStore(ActiveMQTopic destination) {
174 this.letter.removeTopicMessageStore(destination);
175 }
176
177 /**
178 * @param context
179 * @throws IOException
180 * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
181 */
182 @Override
183 public void rollbackTransaction(ConnectionContext context) throws IOException {
184 this.letter.rollbackTransaction(context);
185 }
186
187 /**
188 * @param brokerName
189 * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
190 */
191 @Override
192 public void setBrokerName(String brokerName) {
193 this.letter.setBrokerName(brokerName);
194 }
195
196 /**
197 * @param usageManager
198 * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
199 */
200 @Override
201 public void setUsageManager(SystemUsage usageManager) {
202 this.letter.setUsageManager(usageManager);
203 }
204
205 /**
206 * @return the size of the store
207 * @see org.apache.activemq.store.PersistenceAdapter#size()
208 */
209 @Override
210 public long size() {
211 return this.letter.size();
212 }
213
214 /**
215 * @throws Exception
216 * @see org.apache.activemq.Service#start()
217 */
218 @Override
219 public void doStart() throws Exception {
220 this.letter.start();
221
222 if (brokerService != null && brokerService.isUseJmx()) {
223 PersistenceAdapterView view = new PersistenceAdapterView(this);
224 view.setInflightTransactionViewCallable(new Callable<String>() {
225 @Override
226 public String call() throws Exception {
227 return letter.getTransactions();
228 }
229 });
230 view.setDataViewCallable(new Callable<String>() {
231 @Override
232 public String call() throws Exception {
233 return letter.getJournal().getFileMap().keySet().toString();
234 }
235 });
236 AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view,
237 createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString()));
238 }
239 }
240
241 /**
242 * @throws Exception
243 * @see org.apache.activemq.Service#stop()
244 */
245 @Override
246 public void doStop(ServiceStopper stopper) throws Exception {
247 this.letter.stop();
248
249 if (brokerService != null && brokerService.isUseJmx()) {
250 ObjectName brokerObjectName = brokerService.getBrokerObjectName();
251 brokerService.getManagementContext().unregisterMBean(createPersistenceAdapterName(brokerObjectName.toString(), toString()));
252 }
253 }
254
255 /**
256 * Get the journalMaxFileLength
257 *
258 * @return the journalMaxFileLength
259 */
260 @Override
261 public int getJournalMaxFileLength() {
262 return this.letter.getJournalMaxFileLength();
263 }
264
265 /**
266 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
267 * be used
268 *
269 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
270 */
271 public void setJournalMaxFileLength(int journalMaxFileLength) {
272 this.letter.setJournalMaxFileLength(journalMaxFileLength);
273 }
274
275 /**
276 * Set the max number of producers (LRU cache) to track for duplicate sends
277 */
278 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
279 this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
280 }
281
282 public int getMaxFailoverProducersToTrack() {
283 return this.letter.getMaxFailoverProducersToTrack();
284 }
285
286 /**
287 * set the audit window depth for duplicate suppression (should exceed the max transaction
288 * batch)
289 */
290 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
291 this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
292 }
293
294 public int getFailoverProducersAuditDepth() {
295 return this.letter.getFailoverProducersAuditDepth();
296 }
297
298 /**
299 * Get the checkpointInterval
300 *
301 * @return the checkpointInterval
302 */
303 public long getCheckpointInterval() {
304 return this.letter.getCheckpointInterval();
305 }
306
307 /**
308 * Set the checkpointInterval
309 *
310 * @param checkpointInterval
311 * the checkpointInterval to set
312 */
313 public void setCheckpointInterval(long checkpointInterval) {
314 this.letter.setCheckpointInterval(checkpointInterval);
315 }
316
317 /**
318 * Get the cleanupInterval
319 *
320 * @return the cleanupInterval
321 */
322 public long getCleanupInterval() {
323 return this.letter.getCleanupInterval();
324 }
325
326 /**
327 * Set the cleanupInterval
328 *
329 * @param cleanupInterval
330 * the cleanupInterval to set
331 */
332 public void setCleanupInterval(long cleanupInterval) {
333 this.letter.setCleanupInterval(cleanupInterval);
334 }
335
336 /**
337 * Get the indexWriteBatchSize
338 *
339 * @return the indexWriteBatchSize
340 */
341 public int getIndexWriteBatchSize() {
342 return this.letter.getIndexWriteBatchSize();
343 }
344
345 /**
346 * Set the indexWriteBatchSize
347 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
348 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
349 * @param indexWriteBatchSize
350 * the indexWriteBatchSize to set
351 */
352 public void setIndexWriteBatchSize(int indexWriteBatchSize) {
353 this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
354 }
355
356 /**
357 * Get the journalMaxWriteBatchSize
358 *
359 * @return the journalMaxWriteBatchSize
360 */
361 public int getJournalMaxWriteBatchSize() {
362 return this.letter.getJournalMaxWriteBatchSize();
363 }
364
365 /**
366 * Set the journalMaxWriteBatchSize
367 * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
368 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
369 * @param journalMaxWriteBatchSize
370 * the journalMaxWriteBatchSize to set
371 */
372 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
373 this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
374 }
375
376 /**
377 * Get the enableIndexWriteAsync
378 *
379 * @return the enableIndexWriteAsync
380 */
381 public boolean isEnableIndexWriteAsync() {
382 return this.letter.isEnableIndexWriteAsync();
383 }
384
385 /**
386 * Set the enableIndexWriteAsync
387 *
388 * @param enableIndexWriteAsync
389 * the enableIndexWriteAsync to set
390 */
391 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
392 this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
393 }
394
395 /**
396 * Get the directory
397 *
398 * @return the directory
399 */
400 @Override
401 public File getDirectory() {
402 return this.letter.getDirectory();
403 }
404
405 /**
406 * @param dir
407 * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
408 */
409 @Override
410 public void setDirectory(File dir) {
411 this.letter.setDirectory(dir);
412 }
413
414 /**
415 * Get the enableJournalDiskSyncs
416 *
417 * @return the enableJournalDiskSyncs
418 */
419 public boolean isEnableJournalDiskSyncs() {
420 return this.letter.isEnableJournalDiskSyncs();
421 }
422
423 /**
424 * Set the enableJournalDiskSyncs
425 *
426 * @param enableJournalDiskSyncs
427 * the enableJournalDiskSyncs to set
428 */
429 public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
430 this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
431 }
432
433 /**
434 * Get the indexCacheSize
435 *
436 * @return the indexCacheSize
437 */
438 public int getIndexCacheSize() {
439 return this.letter.getIndexCacheSize();
440 }
441
442 /**
443 * Set the indexCacheSize
444 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
445 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
446 * @param indexCacheSize
447 * the indexCacheSize to set
448 */
449 public void setIndexCacheSize(int indexCacheSize) {
450 this.letter.setIndexCacheSize(indexCacheSize);
451 }
452
453 /**
454 * Get the ignoreMissingJournalfiles
455 *
456 * @return the ignoreMissingJournalfiles
457 */
458 public boolean isIgnoreMissingJournalfiles() {
459 return this.letter.isIgnoreMissingJournalfiles();
460 }
461
462 /**
463 * Set the ignoreMissingJournalfiles
464 *
465 * @param ignoreMissingJournalfiles
466 * the ignoreMissingJournalfiles to set
467 */
468 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
469 this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
470 }
471
472 public boolean isChecksumJournalFiles() {
473 return letter.isChecksumJournalFiles();
474 }
475
476 public boolean isCheckForCorruptJournalFiles() {
477 return letter.isCheckForCorruptJournalFiles();
478 }
479
480 public void setChecksumJournalFiles(boolean checksumJournalFiles) {
481 letter.setChecksumJournalFiles(checksumJournalFiles);
482 }
483
484 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
485 letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
486 }
487
488 @Override
489 public void setBrokerService(BrokerService brokerService) {
490 super.setBrokerService(brokerService);
491 letter.setBrokerService(brokerService);
492 }
493
494 public boolean isArchiveDataLogs() {
495 return letter.isArchiveDataLogs();
496 }
497
498 public void setArchiveDataLogs(boolean archiveDataLogs) {
499 letter.setArchiveDataLogs(archiveDataLogs);
500 }
501
502 public File getDirectoryArchive() {
503 return letter.getDirectoryArchive();
504 }
505
506 public void setDirectoryArchive(File directoryArchive) {
507 letter.setDirectoryArchive(directoryArchive);
508 }
509
510 public boolean isConcurrentStoreAndDispatchQueues() {
511 return letter.isConcurrentStoreAndDispatchQueues();
512 }
513
514 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
515 letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
516 }
517
518 public boolean isConcurrentStoreAndDispatchTopics() {
519 return letter.isConcurrentStoreAndDispatchTopics();
520 }
521
522 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
523 letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
524 }
525
526 public int getMaxAsyncJobs() {
527 return letter.getMaxAsyncJobs();
528 }
529 /**
530 * @param maxAsyncJobs
531 * the maxAsyncJobs to set
532 */
533 public void setMaxAsyncJobs(int maxAsyncJobs) {
534 letter.setMaxAsyncJobs(maxAsyncJobs);
535 }
536
537 /**
538 * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
539 *
540 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
541 */
542 @Deprecated
543 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException {
544 getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay);
545 }
546
547 public boolean getForceRecoverIndex() {
548 return letter.getForceRecoverIndex();
549 }
550
551 public void setForceRecoverIndex(boolean forceRecoverIndex) {
552 letter.setForceRecoverIndex(forceRecoverIndex);
553 }
554
555 public boolean isArchiveCorruptedIndex() {
556 return letter.isArchiveCorruptedIndex();
557 }
558
559 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
560 letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
561 }
562
563 /**
564 * When true, persist the redelivery status such that the message redelivery flag can survive a broker failure
565 * used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean) true
566 */
567 public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
568 letter.setRewriteOnRedelivery(rewriteOnRedelivery);
569 }
570
571 public boolean isRewriteOnRedelivery() {
572 return letter.isRewriteOnRedelivery();
573 }
574
575 public float getIndexLFUEvictionFactor() {
576 return letter.getIndexLFUEvictionFactor();
577 }
578
579 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
580 letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor);
581 }
582
583 public boolean isUseIndexLFRUEviction() {
584 return letter.isUseIndexLFRUEviction();
585 }
586
587 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
588 letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
589 }
590
591 public void setEnableIndexDiskSyncs(boolean diskSyncs) {
592 letter.setEnableIndexDiskSyncs(diskSyncs);
593 }
594
595 public boolean isEnableIndexDiskSyncs() {
596 return letter.isEnableIndexDiskSyncs();
597 }
598
599 public void setEnableIndexRecoveryFile(boolean enable) {
600 letter.setEnableIndexRecoveryFile(enable);
601 }
602
603 public boolean isEnableIndexRecoveryFile() {
604 return letter.isEnableIndexRecoveryFile();
605 }
606
607 public void setEnableIndexPageCaching(boolean enable) {
608 letter.setEnableIndexPageCaching(enable);
609 }
610
611 public boolean isEnableIndexPageCaching() {
612 return letter.isEnableIndexPageCaching();
613 }
614
615 public KahaDBStore getStore() {
616 return letter;
617 }
618
619 public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
620 if (txid == null) {
621 return null;
622 }
623 KahaTransactionInfo rc = new KahaTransactionInfo();
624
625 if (txid.isLocalTransaction()) {
626 LocalTransactionId t = (LocalTransactionId) txid;
627 KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
628 kahaTxId.setConnectionId(t.getConnectionId().getValue());
629 kahaTxId.setTransactionId(t.getValue());
630 rc.setLocalTransactionId(kahaTxId);
631 } else {
632 XATransactionId t = (XATransactionId) txid;
633 KahaXATransactionId kahaTxId = new KahaXATransactionId();
634 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
635 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
636 kahaTxId.setFormatId(t.getFormatId());
637 rc.setXaTransactionId(kahaTxId);
638 }
639 return rc;
640 }
641
642 @Override
643 public Locker createDefaultLocker() throws IOException {
644 SharedFileLocker locker = new SharedFileLocker();
645 locker.configure(this);
646 return locker;
647 }
648
649 @Override
650 public void init() throws Exception {}
651
652 @Override
653 public String toString() {
654 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
655 return "KahaDBPersistenceAdapter[" + path + "]";
656 }
657
658 }