001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName;
020
021import java.io.File;
022import java.io.IOException;
023import java.util.Set;
024import java.util.concurrent.Callable;
025
026import javax.management.ObjectName;
027
028import org.apache.activemq.broker.BrokerService;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.broker.LockableServiceSupport;
031import org.apache.activemq.broker.Locker;
032import org.apache.activemq.broker.jmx.AnnotatedMBean;
033import org.apache.activemq.broker.jmx.PersistenceAdapterView;
034import org.apache.activemq.broker.scheduler.JobSchedulerStore;
035import org.apache.activemq.command.ActiveMQDestination;
036import org.apache.activemq.command.ActiveMQQueue;
037import org.apache.activemq.command.ActiveMQTopic;
038import org.apache.activemq.command.LocalTransactionId;
039import org.apache.activemq.command.ProducerId;
040import org.apache.activemq.command.TransactionId;
041import org.apache.activemq.command.XATransactionId;
042import org.apache.activemq.protobuf.Buffer;
043import org.apache.activemq.store.JournaledStore;
044import org.apache.activemq.store.MessageStore;
045import org.apache.activemq.store.NoLocalSubscriptionAware;
046import org.apache.activemq.store.PersistenceAdapter;
047import org.apache.activemq.store.SharedFileLocker;
048import org.apache.activemq.store.TopicMessageStore;
049import org.apache.activemq.store.TransactionIdTransformer;
050import org.apache.activemq.store.TransactionIdTransformerAware;
051import org.apache.activemq.store.TransactionStore;
052import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
053import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
054import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
055import org.apache.activemq.usage.SystemUsage;
056import org.apache.activemq.util.ServiceStopper;
057
058/**
059 * An implementation of {@link PersistenceAdapter} designed for use with
060 * KahaDB - Embedded Lightweight Non-Relational Database
061 *
062 * @org.apache.xbean.XBean element="kahaDB"
063 *
064 */
065public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter,
066    JournaledStore, TransactionIdTransformerAware, NoLocalSubscriptionAware {
067
068    private final KahaDBStore letter = new KahaDBStore();
069
070    /**
071     * @param context
072     * @throws IOException
073     * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
074     */
075    @Override
076    public void beginTransaction(ConnectionContext context) throws IOException {
077        this.letter.beginTransaction(context);
078    }
079
080    /**
081     * @param sync
082     * @throws IOException
083     * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
084     */
085    @Override
086    public void checkpoint(boolean sync) throws IOException {
087        this.letter.checkpoint(sync);
088    }
089
090    /**
091     * @param context
092     * @throws IOException
093     * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
094     */
095    @Override
096    public void commitTransaction(ConnectionContext context) throws IOException {
097        this.letter.commitTransaction(context);
098    }
099
100    /**
101     * @param destination
102     * @return MessageStore
103     * @throws IOException
104     * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
105     */
106    @Override
107    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
108        return this.letter.createQueueMessageStore(destination);
109    }
110
111    /**
112     * @param destination
113     * @return TopicMessageStore
114     * @throws IOException
115     * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
116     */
117    @Override
118    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
119        return this.letter.createTopicMessageStore(destination);
120    }
121
122    /**
123     * @return TransactionStore
124     * @throws IOException
125     * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
126     */
127    @Override
128    public TransactionStore createTransactionStore() throws IOException {
129        return this.letter.createTransactionStore();
130    }
131
132    /**
133     * @throws IOException
134     * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
135     */
136    @Override
137    public void deleteAllMessages() throws IOException {
138        this.letter.deleteAllMessages();
139    }
140
141    /**
142     * @return destinations
143     * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
144     */
145    @Override
146    public Set<ActiveMQDestination> getDestinations() {
147        return this.letter.getDestinations();
148    }
149
150    /**
151     * @return lastMessageBrokerSequenceId
152     * @throws IOException
153     * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
154     */
155    @Override
156    public long getLastMessageBrokerSequenceId() throws IOException {
157        return this.letter.getLastMessageBrokerSequenceId();
158    }
159
160    @Override
161    public long getLastProducerSequenceId(ProducerId id) throws IOException {
162        return this.letter.getLastProducerSequenceId(id);
163    }
164
165    /**
166     * @param destination
167     * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
168     */
169    @Override
170    public void removeQueueMessageStore(ActiveMQQueue destination) {
171        this.letter.removeQueueMessageStore(destination);
172    }
173
174    /**
175     * @param destination
176     * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
177     */
178    @Override
179    public void removeTopicMessageStore(ActiveMQTopic destination) {
180        this.letter.removeTopicMessageStore(destination);
181    }
182
183    /**
184     * @param context
185     * @throws IOException
186     * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
187     */
188    @Override
189    public void rollbackTransaction(ConnectionContext context) throws IOException {
190        this.letter.rollbackTransaction(context);
191    }
192
193    /**
194     * @param brokerName
195     * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
196     */
197    @Override
198    public void setBrokerName(String brokerName) {
199        this.letter.setBrokerName(brokerName);
200    }
201
202    /**
203     * @param usageManager
204     * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
205     */
206    @Override
207    public void setUsageManager(SystemUsage usageManager) {
208        this.letter.setUsageManager(usageManager);
209    }
210
211    /**
212     * @return the size of the store
213     * @see org.apache.activemq.store.PersistenceAdapter#size()
214     */
215    @Override
216    public long size() {
217        return this.letter.size();
218    }
219
220    /**
221     * @throws Exception
222     * @see org.apache.activemq.Service#start()
223     */
224    @Override
225    public void doStart() throws Exception {
226        this.letter.start();
227
228        if (brokerService != null && brokerService.isUseJmx()) {
229            PersistenceAdapterView view = new PersistenceAdapterView(this);
230            view.setInflightTransactionViewCallable(new Callable<String>() {
231                @Override
232                public String call() throws Exception {
233                    return letter.getTransactions();
234                }
235            });
236            view.setDataViewCallable(new Callable<String>() {
237                @Override
238                public String call() throws Exception {
239                    return letter.getJournal().getFileMap().keySet().toString();
240                }
241            });
242            AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view,
243                    createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString()));
244        }
245    }
246
247    /**
248     * @throws Exception
249     * @see org.apache.activemq.Service#stop()
250     */
251    @Override
252    public void doStop(ServiceStopper stopper) throws Exception {
253        this.letter.stop();
254
255        if (brokerService != null && brokerService.isUseJmx()) {
256            ObjectName brokerObjectName = brokerService.getBrokerObjectName();
257            brokerService.getManagementContext().unregisterMBean(createPersistenceAdapterName(brokerObjectName.toString(), toString()));
258        }
259    }
260
261    /**
262     * Get the journalMaxFileLength
263     *
264     * @return the journalMaxFileLength
265     */
266    @Override
267    public int getJournalMaxFileLength() {
268        return this.letter.getJournalMaxFileLength();
269    }
270
271    /**
272     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
273     * be used
274     *
275     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
276     */
277    public void setJournalMaxFileLength(int journalMaxFileLength) {
278        this.letter.setJournalMaxFileLength(journalMaxFileLength);
279    }
280
281    /**
282     * Set the max number of producers (LRU cache) to track for duplicate sends
283     */
284    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
285        this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
286    }
287
288    public int getMaxFailoverProducersToTrack() {
289        return this.letter.getMaxFailoverProducersToTrack();
290    }
291
292    /**
293     * set the audit window depth for duplicate suppression (should exceed the max transaction
294     * batch)
295     */
296    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
297        this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
298    }
299
300    public int getFailoverProducersAuditDepth() {
301        return this.letter.getFailoverProducersAuditDepth();
302    }
303
304    /**
305     * Get the checkpointInterval
306     *
307     * @return the checkpointInterval
308     */
309    public long getCheckpointInterval() {
310        return this.letter.getCheckpointInterval();
311    }
312
313    /**
314     * Set the checkpointInterval
315     *
316     * @param checkpointInterval
317     *            the checkpointInterval to set
318     */
319    public void setCheckpointInterval(long checkpointInterval) {
320        this.letter.setCheckpointInterval(checkpointInterval);
321    }
322
323    /**
324     * Get the cleanupInterval
325     *
326     * @return the cleanupInterval
327     */
328    public long getCleanupInterval() {
329        return this.letter.getCleanupInterval();
330    }
331
332    /**
333     * Set the cleanupInterval
334     *
335     * @param cleanupInterval
336     *            the cleanupInterval to set
337     */
338    public void setCleanupInterval(long cleanupInterval) {
339        this.letter.setCleanupInterval(cleanupInterval);
340    }
341
342    /**
343     * Get the indexWriteBatchSize
344     *
345     * @return the indexWriteBatchSize
346     */
347    public int getIndexWriteBatchSize() {
348        return this.letter.getIndexWriteBatchSize();
349    }
350
351    /**
352     * Set the indexWriteBatchSize
353     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
354     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
355     * @param indexWriteBatchSize
356     *            the indexWriteBatchSize to set
357     */
358    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
359        this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
360    }
361
362    /**
363     * Get the journalMaxWriteBatchSize
364     *
365     * @return the journalMaxWriteBatchSize
366     */
367    public int getJournalMaxWriteBatchSize() {
368        return this.letter.getJournalMaxWriteBatchSize();
369    }
370
371    /**
372     * Set the journalMaxWriteBatchSize
373     *  * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
374     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
375     * @param journalMaxWriteBatchSize
376     *            the journalMaxWriteBatchSize to set
377     */
378    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
379        this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
380    }
381
382    /**
383     * Get the enableIndexWriteAsync
384     *
385     * @return the enableIndexWriteAsync
386     */
387    public boolean isEnableIndexWriteAsync() {
388        return this.letter.isEnableIndexWriteAsync();
389    }
390
391    /**
392     * Set the enableIndexWriteAsync
393     *
394     * @param enableIndexWriteAsync
395     *            the enableIndexWriteAsync to set
396     */
397    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
398        this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
399    }
400
401    /**
402     * Get the directory
403     *
404     * @return the directory
405     */
406    @Override
407    public File getDirectory() {
408        return this.letter.getDirectory();
409    }
410
411    /**
412     * @param dir
413     * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
414     */
415    @Override
416    public void setDirectory(File dir) {
417        this.letter.setDirectory(dir);
418    }
419
420    /**
421     * @return the currently configured location of the KahaDB index files.
422     */
423    public File getIndexDirectory() {
424        return this.letter.getIndexDirectory();
425    }
426
427    /**
428     * Sets the directory where KahaDB index files should be written.
429     *
430     * @param indexDirectory
431     *        the directory where the KahaDB store index files should be written.
432     */
433    public void setIndexDirectory(File indexDirectory) {
434        this.letter.setIndexDirectory(indexDirectory);
435    }
436
437    /**
438     * Get the enableJournalDiskSyncs
439     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
440     * @return the enableJournalDiskSyncs
441     */
442    public boolean isEnableJournalDiskSyncs() {
443        return this.letter.isEnableJournalDiskSyncs();
444    }
445
446    /**
447     * Set the enableJournalDiskSyncs
448     *
449     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
450     * @param enableJournalDiskSyncs
451     *            the enableJournalDiskSyncs to set
452     */
453    public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
454        this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
455    }
456
457    /**
458     * @return
459     */
460    public String getJournalDiskSyncStrategy() {
461        return letter.getJournalDiskSyncStrategy();
462    }
463
464    /**
465     * @param journalDiskSyncStrategy
466     */
467    public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
468        letter.setJournalDiskSyncStrategy(journalDiskSyncStrategy);
469    }
470
471    /**
472     * @return
473     */
474    public long getJournalDiskSyncInterval() {
475        return letter.getJournalDiskSyncInterval();
476    }
477
478    /**
479     * @param journalDiskSyncInterval
480     */
481    public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
482        letter.setJournalDiskSyncInterval(journalDiskSyncInterval);
483    }
484
485    /**
486     * Get the indexCacheSize
487     *
488     * @return the indexCacheSize
489     */
490    public int getIndexCacheSize() {
491        return this.letter.getIndexCacheSize();
492    }
493
494    /**
495     * Set the indexCacheSize
496     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
497     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
498     * @param indexCacheSize
499     *            the indexCacheSize to set
500     */
501    public void setIndexCacheSize(int indexCacheSize) {
502        this.letter.setIndexCacheSize(indexCacheSize);
503    }
504
505    /**
506     * Get the ignoreMissingJournalfiles
507     *
508     * @return the ignoreMissingJournalfiles
509     */
510    public boolean isIgnoreMissingJournalfiles() {
511        return this.letter.isIgnoreMissingJournalfiles();
512    }
513
514    /**
515     * Set the ignoreMissingJournalfiles
516     *
517     * @param ignoreMissingJournalfiles
518     *            the ignoreMissingJournalfiles to set
519     */
520    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
521        this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
522    }
523
524    public boolean isChecksumJournalFiles() {
525        return letter.isChecksumJournalFiles();
526    }
527
528    public boolean isCheckForCorruptJournalFiles() {
529        return letter.isCheckForCorruptJournalFiles();
530    }
531
532    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
533        letter.setChecksumJournalFiles(checksumJournalFiles);
534    }
535
536    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
537        letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
538    }
539
540    @Override
541    public void setBrokerService(BrokerService brokerService) {
542        super.setBrokerService(brokerService);
543        letter.setBrokerService(brokerService);
544    }
545
546    public String getPreallocationScope() {
547        return letter.getPreallocationScope();
548    }
549
550    public void setPreallocationScope(String preallocationScope) {
551        this.letter.setPreallocationScope(preallocationScope);
552    }
553
554    public String getPreallocationStrategy() {
555        return letter.getPreallocationStrategy();
556    }
557
558    public void setPreallocationStrategy(String preallocationStrategy) {
559        this.letter.setPreallocationStrategy(preallocationStrategy);
560    }
561
562    public boolean isArchiveDataLogs() {
563        return letter.isArchiveDataLogs();
564    }
565
566    public void setArchiveDataLogs(boolean archiveDataLogs) {
567        letter.setArchiveDataLogs(archiveDataLogs);
568    }
569
570    public File getDirectoryArchive() {
571        return letter.getDirectoryArchive();
572    }
573
574    public void setDirectoryArchive(File directoryArchive) {
575        letter.setDirectoryArchive(directoryArchive);
576    }
577
578    public boolean isConcurrentStoreAndDispatchQueues() {
579        return letter.isConcurrentStoreAndDispatchQueues();
580    }
581
582    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
583        letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
584    }
585
586    public boolean isConcurrentStoreAndDispatchTopics() {
587        return letter.isConcurrentStoreAndDispatchTopics();
588    }
589
590    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
591        letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
592    }
593
594    public int getMaxAsyncJobs() {
595        return letter.getMaxAsyncJobs();
596    }
597    /**
598     * @param maxAsyncJobs
599     *            the maxAsyncJobs to set
600     */
601    public void setMaxAsyncJobs(int maxAsyncJobs) {
602        letter.setMaxAsyncJobs(maxAsyncJobs);
603    }
604
605    /**
606     * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
607     *
608     * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
609     */
610    @Deprecated
611    public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException {
612       getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay);
613    }
614
615    public boolean getForceRecoverIndex() {
616        return letter.getForceRecoverIndex();
617    }
618
619    public void setForceRecoverIndex(boolean forceRecoverIndex) {
620        letter.setForceRecoverIndex(forceRecoverIndex);
621    }
622
623    public boolean isArchiveCorruptedIndex() {
624        return letter.isArchiveCorruptedIndex();
625    }
626
627    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
628        letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
629    }
630
631    public float getIndexLFUEvictionFactor() {
632        return letter.getIndexLFUEvictionFactor();
633    }
634
635    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
636        letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor);
637    }
638
639    public boolean isUseIndexLFRUEviction() {
640        return letter.isUseIndexLFRUEviction();
641    }
642
643    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
644        letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
645    }
646
647    public void setEnableIndexDiskSyncs(boolean diskSyncs) {
648        letter.setEnableIndexDiskSyncs(diskSyncs);
649    }
650
651    public boolean isEnableIndexDiskSyncs() {
652        return letter.isEnableIndexDiskSyncs();
653    }
654
655    public void setEnableIndexRecoveryFile(boolean enable) {
656        letter.setEnableIndexRecoveryFile(enable);
657    }
658
659    public boolean  isEnableIndexRecoveryFile() {
660        return letter.isEnableIndexRecoveryFile();
661    }
662
663    public void setEnableIndexPageCaching(boolean enable) {
664        letter.setEnableIndexPageCaching(enable);
665    }
666
667    public boolean isEnableIndexPageCaching() {
668        return letter.isEnableIndexPageCaching();
669    }
670
671    public int getCompactAcksAfterNoGC() {
672        return letter.getCompactAcksAfterNoGC();
673    }
674
675    /**
676     * Sets the number of GC cycles where no journal logs were removed before an attempt to
677     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
678     * <p>
679     * A value of -1 will disable this feature.
680     *
681     * @param compactAcksAfterNoGC
682     *      Number of empty GC cycles before we rewrite old ACKS.
683     */
684    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
685        this.letter.setCompactAcksAfterNoGC(compactAcksAfterNoGC);
686    }
687
688    public boolean isCompactAcksIgnoresStoreGrowth() {
689        return this.letter.isCompactAcksIgnoresStoreGrowth();
690    }
691
692    /**
693     * Configure if Ack compaction will occur regardless of continued growth of the
694     * journal logs meaning that the store has not run out of space yet.  Because the
695     * compaction operation can be costly this value is defaulted to off and the Ack
696     * compaction is only done when it seems that the store cannot grow and larger.
697     *
698     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
699     */
700    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
701        this.letter.setCompactAcksIgnoresStoreGrowth(compactAcksIgnoresStoreGrowth);
702    }
703
704    /**
705     * Returns whether Ack compaction is enabled
706     *
707     * @return enableAckCompaction
708     */
709    public boolean isEnableAckCompaction() {
710        return letter.isEnableAckCompaction();
711    }
712
713    /**
714     * Configure if the Ack compaction task should be enabled to run
715     *
716     * @param enableAckCompaction
717     */
718    public void setEnableAckCompaction(boolean enableAckCompaction) {
719        letter.setEnableAckCompaction(enableAckCompaction);
720    }
721
722    /**
723     * Whether non-blocking subscription statistics have been enabled
724     *
725     * @return
726     */
727    public boolean isEnableSubscriptionStatistics() {
728        return letter.isEnableSubscriptionStatistics();
729    }
730
731    /**
732     * Enable caching statistics for each subscription to allow non-blocking
733     * retrieval of metrics.  This could incur some overhead to compute if there are a lot
734     * of subscriptions.
735     *
736     * @param enableSubscriptionStatistics
737     */
738    public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) {
739        letter.setEnableSubscriptionStatistics(enableSubscriptionStatistics);
740    }
741
742    public KahaDBStore getStore() {
743        return letter;
744    }
745
746    public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
747        if (txid == null) {
748            return null;
749        }
750        KahaTransactionInfo rc = new KahaTransactionInfo();
751
752        if (txid.isLocalTransaction()) {
753            LocalTransactionId t = (LocalTransactionId) txid;
754            KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
755            kahaTxId.setConnectionId(t.getConnectionId().getValue());
756            kahaTxId.setTransactionId(t.getValue());
757            rc.setLocalTransactionId(kahaTxId);
758        } else {
759            XATransactionId t = (XATransactionId) txid;
760            KahaXATransactionId kahaTxId = new KahaXATransactionId();
761            kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
762            kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
763            kahaTxId.setFormatId(t.getFormatId());
764            rc.setXaTransactionId(kahaTxId);
765        }
766        return rc;
767    }
768
769    @Override
770    public Locker createDefaultLocker() throws IOException {
771        SharedFileLocker locker = new SharedFileLocker();
772        locker.configure(this);
773        return locker;
774    }
775
776    @Override
777    public void init() throws Exception {}
778
779    @Override
780    public String toString() {
781        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
782        return "KahaDBPersistenceAdapter[" + path + "]";
783    }
784
785    @Override
786    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
787        getStore().setTransactionIdTransformer(transactionIdTransformer);
788    }
789
790    @Override
791    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
792        return this.letter.createJobSchedulerStore();
793    }
794
795    /* (non-Javadoc)
796     * @see org.apache.activemq.store.NoLocalSubscriptionAware#isPersistNoLocal()
797     */
798    @Override
799    public boolean isPersistNoLocal() {
800        return this.letter.isPersistNoLocal();
801    }
802}