001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName;
020
021import java.io.File;
022import java.io.IOException;
023import java.util.Set;
024import java.util.concurrent.Callable;
025
026import javax.management.ObjectName;
027
028import org.apache.activemq.broker.BrokerService;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.broker.LockableServiceSupport;
031import org.apache.activemq.broker.Locker;
032import org.apache.activemq.broker.jmx.AnnotatedMBean;
033import org.apache.activemq.broker.jmx.PersistenceAdapterView;
034import org.apache.activemq.broker.scheduler.JobSchedulerStore;
035import org.apache.activemq.command.ActiveMQDestination;
036import org.apache.activemq.command.ActiveMQQueue;
037import org.apache.activemq.command.ActiveMQTopic;
038import org.apache.activemq.command.LocalTransactionId;
039import org.apache.activemq.command.ProducerId;
040import org.apache.activemq.command.TransactionId;
041import org.apache.activemq.command.XATransactionId;
042import org.apache.activemq.protobuf.Buffer;
043import org.apache.activemq.store.JournaledStore;
044import org.apache.activemq.store.MessageStore;
045import org.apache.activemq.store.PersistenceAdapter;
046import org.apache.activemq.store.SharedFileLocker;
047import org.apache.activemq.store.TopicMessageStore;
048import org.apache.activemq.store.TransactionIdTransformer;
049import org.apache.activemq.store.TransactionIdTransformerAware;
050import org.apache.activemq.store.TransactionStore;
051import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
052import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
053import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
054import org.apache.activemq.usage.SystemUsage;
055import org.apache.activemq.util.ServiceStopper;
056
057/**
058 * An implementation of {@link PersistenceAdapter} designed for use with
059 * KahaDB - Embedded Lightweight Non-Relational Database
060 *
061 * @org.apache.xbean.XBean element="kahaDB"
062 *
063 */
064public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, JournaledStore, TransactionIdTransformerAware {
065    private final KahaDBStore letter = new KahaDBStore();
066
067    /**
068     * @param context
069     * @throws IOException
070     * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
071     */
072    @Override
073    public void beginTransaction(ConnectionContext context) throws IOException {
074        this.letter.beginTransaction(context);
075    }
076
077    /**
078     * @param sync
079     * @throws IOException
080     * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
081     */
082    @Override
083    public void checkpoint(boolean sync) throws IOException {
084        this.letter.checkpoint(sync);
085    }
086
087    /**
088     * @param context
089     * @throws IOException
090     * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
091     */
092    @Override
093    public void commitTransaction(ConnectionContext context) throws IOException {
094        this.letter.commitTransaction(context);
095    }
096
097    /**
098     * @param destination
099     * @return MessageStore
100     * @throws IOException
101     * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
102     */
103    @Override
104    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
105        return this.letter.createQueueMessageStore(destination);
106    }
107
108    /**
109     * @param destination
110     * @return TopicMessageStore
111     * @throws IOException
112     * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
113     */
114    @Override
115    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
116        return this.letter.createTopicMessageStore(destination);
117    }
118
119    /**
120     * @return TransactionStore
121     * @throws IOException
122     * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
123     */
124    @Override
125    public TransactionStore createTransactionStore() throws IOException {
126        return this.letter.createTransactionStore();
127    }
128
129    /**
130     * @throws IOException
131     * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
132     */
133    @Override
134    public void deleteAllMessages() throws IOException {
135        this.letter.deleteAllMessages();
136    }
137
138    /**
139     * @return destinations
140     * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
141     */
142    @Override
143    public Set<ActiveMQDestination> getDestinations() {
144        return this.letter.getDestinations();
145    }
146
147    /**
148     * @return lastMessageBrokerSequenceId
149     * @throws IOException
150     * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
151     */
152    @Override
153    public long getLastMessageBrokerSequenceId() throws IOException {
154        return this.letter.getLastMessageBrokerSequenceId();
155    }
156
157    @Override
158    public long getLastProducerSequenceId(ProducerId id) throws IOException {
159        return this.letter.getLastProducerSequenceId(id);
160    }
161
162    /**
163     * @param destination
164     * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
165     */
166    @Override
167    public void removeQueueMessageStore(ActiveMQQueue destination) {
168        this.letter.removeQueueMessageStore(destination);
169    }
170
171    /**
172     * @param destination
173     * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
174     */
175    @Override
176    public void removeTopicMessageStore(ActiveMQTopic destination) {
177        this.letter.removeTopicMessageStore(destination);
178    }
179
180    /**
181     * @param context
182     * @throws IOException
183     * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
184     */
185    @Override
186    public void rollbackTransaction(ConnectionContext context) throws IOException {
187        this.letter.rollbackTransaction(context);
188    }
189
190    /**
191     * @param brokerName
192     * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
193     */
194    @Override
195    public void setBrokerName(String brokerName) {
196        this.letter.setBrokerName(brokerName);
197    }
198
199    /**
200     * @param usageManager
201     * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
202     */
203    @Override
204    public void setUsageManager(SystemUsage usageManager) {
205        this.letter.setUsageManager(usageManager);
206    }
207
208    /**
209     * @return the size of the store
210     * @see org.apache.activemq.store.PersistenceAdapter#size()
211     */
212    @Override
213    public long size() {
214        return this.letter.size();
215    }
216
217    /**
218     * @throws Exception
219     * @see org.apache.activemq.Service#start()
220     */
221    @Override
222    public void doStart() throws Exception {
223        this.letter.start();
224
225        if (brokerService != null && brokerService.isUseJmx()) {
226            PersistenceAdapterView view = new PersistenceAdapterView(this);
227            view.setInflightTransactionViewCallable(new Callable<String>() {
228                @Override
229                public String call() throws Exception {
230                    return letter.getTransactions();
231                }
232            });
233            view.setDataViewCallable(new Callable<String>() {
234                @Override
235                public String call() throws Exception {
236                    return letter.getJournal().getFileMap().keySet().toString();
237                }
238            });
239            AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view,
240                    createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString()));
241        }
242    }
243
244    /**
245     * @throws Exception
246     * @see org.apache.activemq.Service#stop()
247     */
248    @Override
249    public void doStop(ServiceStopper stopper) throws Exception {
250        this.letter.stop();
251
252        if (brokerService != null && brokerService.isUseJmx()) {
253            ObjectName brokerObjectName = brokerService.getBrokerObjectName();
254            brokerService.getManagementContext().unregisterMBean(createPersistenceAdapterName(brokerObjectName.toString(), toString()));
255        }
256    }
257
258    /**
259     * Get the journalMaxFileLength
260     *
261     * @return the journalMaxFileLength
262     */
263    @Override
264    public int getJournalMaxFileLength() {
265        return this.letter.getJournalMaxFileLength();
266    }
267
268    /**
269     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
270     * be used
271     *
272     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
273     */
274    public void setJournalMaxFileLength(int journalMaxFileLength) {
275        this.letter.setJournalMaxFileLength(journalMaxFileLength);
276    }
277
278    /**
279     * Set the max number of producers (LRU cache) to track for duplicate sends
280     */
281    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
282        this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
283    }
284
285    public int getMaxFailoverProducersToTrack() {
286        return this.letter.getMaxFailoverProducersToTrack();
287    }
288
289    /**
290     * set the audit window depth for duplicate suppression (should exceed the max transaction
291     * batch)
292     */
293    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
294        this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
295    }
296
297    public int getFailoverProducersAuditDepth() {
298        return this.letter.getFailoverProducersAuditDepth();
299    }
300
301    /**
302     * Get the checkpointInterval
303     *
304     * @return the checkpointInterval
305     */
306    public long getCheckpointInterval() {
307        return this.letter.getCheckpointInterval();
308    }
309
310    /**
311     * Set the checkpointInterval
312     *
313     * @param checkpointInterval
314     *            the checkpointInterval to set
315     */
316    public void setCheckpointInterval(long checkpointInterval) {
317        this.letter.setCheckpointInterval(checkpointInterval);
318    }
319
320    /**
321     * Get the cleanupInterval
322     *
323     * @return the cleanupInterval
324     */
325    public long getCleanupInterval() {
326        return this.letter.getCleanupInterval();
327    }
328
329    /**
330     * Set the cleanupInterval
331     *
332     * @param cleanupInterval
333     *            the cleanupInterval to set
334     */
335    public void setCleanupInterval(long cleanupInterval) {
336        this.letter.setCleanupInterval(cleanupInterval);
337    }
338
339    /**
340     * Get the indexWriteBatchSize
341     *
342     * @return the indexWriteBatchSize
343     */
344    public int getIndexWriteBatchSize() {
345        return this.letter.getIndexWriteBatchSize();
346    }
347
348    /**
349     * Set the indexWriteBatchSize
350     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
351     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
352     * @param indexWriteBatchSize
353     *            the indexWriteBatchSize to set
354     */
355    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
356        this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
357    }
358
359    /**
360     * Get the journalMaxWriteBatchSize
361     *
362     * @return the journalMaxWriteBatchSize
363     */
364    public int getJournalMaxWriteBatchSize() {
365        return this.letter.getJournalMaxWriteBatchSize();
366    }
367
368    /**
369     * Set the journalMaxWriteBatchSize
370     *  * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
371     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
372     * @param journalMaxWriteBatchSize
373     *            the journalMaxWriteBatchSize to set
374     */
375    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
376        this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
377    }
378
379    /**
380     * Get the enableIndexWriteAsync
381     *
382     * @return the enableIndexWriteAsync
383     */
384    public boolean isEnableIndexWriteAsync() {
385        return this.letter.isEnableIndexWriteAsync();
386    }
387
388    /**
389     * Set the enableIndexWriteAsync
390     *
391     * @param enableIndexWriteAsync
392     *            the enableIndexWriteAsync to set
393     */
394    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
395        this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
396    }
397
398    /**
399     * Get the directory
400     *
401     * @return the directory
402     */
403    @Override
404    public File getDirectory() {
405        return this.letter.getDirectory();
406    }
407
408    /**
409     * @param dir
410     * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
411     */
412    @Override
413    public void setDirectory(File dir) {
414        this.letter.setDirectory(dir);
415    }
416
417    /**
418     * @return the currently configured location of the KahaDB index files.
419     */
420    public File getIndexDirectory() {
421        return this.letter.getIndexDirectory();
422    }
423
424    /**
425     * Sets the directory where KahaDB index files should be written.
426     *
427     * @param indexDirectory
428     *        the directory where the KahaDB store index files should be written.
429     */
430    public void setIndexDirectory(File indexDirectory) {
431        this.letter.setIndexDirectory(indexDirectory);
432    }
433
434    /**
435     * Get the enableJournalDiskSyncs
436     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
437     * @return the enableJournalDiskSyncs
438     */
439    public boolean isEnableJournalDiskSyncs() {
440        return this.letter.isEnableJournalDiskSyncs();
441    }
442
443    /**
444     * Set the enableJournalDiskSyncs
445     *
446     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
447     * @param enableJournalDiskSyncs
448     *            the enableJournalDiskSyncs to set
449     */
450    public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
451        this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
452    }
453
454    /**
455     * @return
456     */
457    public String getJournalDiskSyncStrategy() {
458        return letter.getJournalDiskSyncStrategy();
459    }
460
461    /**
462     * @param journalDiskSyncStrategy
463     */
464    public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
465        letter.setJournalDiskSyncStrategy(journalDiskSyncStrategy);
466    }
467
468    /**
469     * @return
470     */
471    public long getJournalDiskSyncInterval() {
472        return letter.getJournalDiskSyncInterval();
473    }
474
475    /**
476     * @param journalDiskSyncInterval
477     */
478    public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
479        letter.setJournalDiskSyncInterval(journalDiskSyncInterval);
480    }
481
482    /**
483     * Get the indexCacheSize
484     *
485     * @return the indexCacheSize
486     */
487    public int getIndexCacheSize() {
488        return this.letter.getIndexCacheSize();
489    }
490
491    /**
492     * Set the indexCacheSize
493     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
494     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
495     * @param indexCacheSize
496     *            the indexCacheSize to set
497     */
498    public void setIndexCacheSize(int indexCacheSize) {
499        this.letter.setIndexCacheSize(indexCacheSize);
500    }
501
502    /**
503     * Get the ignoreMissingJournalfiles
504     *
505     * @return the ignoreMissingJournalfiles
506     */
507    public boolean isIgnoreMissingJournalfiles() {
508        return this.letter.isIgnoreMissingJournalfiles();
509    }
510
511    /**
512     * Set the ignoreMissingJournalfiles
513     *
514     * @param ignoreMissingJournalfiles
515     *            the ignoreMissingJournalfiles to set
516     */
517    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
518        this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
519    }
520
521    public boolean isChecksumJournalFiles() {
522        return letter.isChecksumJournalFiles();
523    }
524
525    public boolean isCheckForCorruptJournalFiles() {
526        return letter.isCheckForCorruptJournalFiles();
527    }
528
529    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
530        letter.setChecksumJournalFiles(checksumJournalFiles);
531    }
532
533    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
534        letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
535    }
536
537    @Override
538    public void setBrokerService(BrokerService brokerService) {
539        super.setBrokerService(brokerService);
540        letter.setBrokerService(brokerService);
541    }
542
543    public String getPreallocationScope() {
544        return letter.getPreallocationScope();
545    }
546
547    public void setPreallocationScope(String preallocationScope) {
548        this.letter.setPreallocationScope(preallocationScope);
549    }
550
551    public String getPreallocationStrategy() {
552        return letter.getPreallocationStrategy();
553    }
554
555    public void setPreallocationStrategy(String preallocationStrategy) {
556        this.letter.setPreallocationStrategy(preallocationStrategy);
557    }
558
559    public boolean isArchiveDataLogs() {
560        return letter.isArchiveDataLogs();
561    }
562
563    public void setArchiveDataLogs(boolean archiveDataLogs) {
564        letter.setArchiveDataLogs(archiveDataLogs);
565    }
566
567    public File getDirectoryArchive() {
568        return letter.getDirectoryArchive();
569    }
570
571    public void setDirectoryArchive(File directoryArchive) {
572        letter.setDirectoryArchive(directoryArchive);
573    }
574
575    public boolean isConcurrentStoreAndDispatchQueues() {
576        return letter.isConcurrentStoreAndDispatchQueues();
577    }
578
579    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
580        letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
581    }
582
583    public boolean isConcurrentStoreAndDispatchTopics() {
584        return letter.isConcurrentStoreAndDispatchTopics();
585    }
586
587    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
588        letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
589    }
590
591    public int getMaxAsyncJobs() {
592        return letter.getMaxAsyncJobs();
593    }
594    /**
595     * @param maxAsyncJobs
596     *            the maxAsyncJobs to set
597     */
598    public void setMaxAsyncJobs(int maxAsyncJobs) {
599        letter.setMaxAsyncJobs(maxAsyncJobs);
600    }
601
602    /**
603     * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
604     *
605     * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
606     */
607    @Deprecated
608    public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException {
609       getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay);
610    }
611
612    public boolean getForceRecoverIndex() {
613        return letter.getForceRecoverIndex();
614    }
615
616    public void setForceRecoverIndex(boolean forceRecoverIndex) {
617        letter.setForceRecoverIndex(forceRecoverIndex);
618    }
619
620    public boolean isArchiveCorruptedIndex() {
621        return letter.isArchiveCorruptedIndex();
622    }
623
624    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
625        letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
626    }
627
628    public float getIndexLFUEvictionFactor() {
629        return letter.getIndexLFUEvictionFactor();
630    }
631
632    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
633        letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor);
634    }
635
636    public boolean isUseIndexLFRUEviction() {
637        return letter.isUseIndexLFRUEviction();
638    }
639
640    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
641        letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
642    }
643
644    public void setEnableIndexDiskSyncs(boolean diskSyncs) {
645        letter.setEnableIndexDiskSyncs(diskSyncs);
646    }
647
648    public boolean isEnableIndexDiskSyncs() {
649        return letter.isEnableIndexDiskSyncs();
650    }
651
652    public void setEnableIndexRecoveryFile(boolean enable) {
653        letter.setEnableIndexRecoveryFile(enable);
654    }
655
656    public boolean  isEnableIndexRecoveryFile() {
657        return letter.isEnableIndexRecoveryFile();
658    }
659
660    public void setEnableIndexPageCaching(boolean enable) {
661        letter.setEnableIndexPageCaching(enable);
662    }
663
664    public boolean isEnableIndexPageCaching() {
665        return letter.isEnableIndexPageCaching();
666    }
667
668    public int getCompactAcksAfterNoGC() {
669        return letter.getCompactAcksAfterNoGC();
670    }
671
672    /**
673     * Sets the number of GC cycles where no journal logs were removed before an attempt to
674     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
675     * <p>
676     * A value of -1 will disable this feature.
677     *
678     * @param compactAcksAfterNoGC
679     *      Number of empty GC cycles before we rewrite old ACKS.
680     */
681    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
682        this.letter.setCompactAcksAfterNoGC(compactAcksAfterNoGC);
683    }
684
685    public boolean isCompactAcksIgnoresStoreGrowth() {
686        return this.letter.isCompactAcksIgnoresStoreGrowth();
687    }
688
689    /**
690     * Configure if Ack compaction will occur regardless of continued growth of the
691     * journal logs meaning that the store has not run out of space yet.  Because the
692     * compaction operation can be costly this value is defaulted to off and the Ack
693     * compaction is only done when it seems that the store cannot grow and larger.
694     *
695     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
696     */
697    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
698        this.letter.setCompactAcksIgnoresStoreGrowth(compactAcksIgnoresStoreGrowth);
699    }
700
701    /**
702     * Returns whether Ack compaction is enabled
703     *
704     * @return enableAckCompaction
705     */
706    public boolean isEnableAckCompaction() {
707        return letter.isEnableAckCompaction();
708    }
709
710    /**
711     * Configure if the Ack compaction task should be enabled to run
712     *
713     * @param enableAckCompaction
714     */
715    public void setEnableAckCompaction(boolean enableAckCompaction) {
716        letter.setEnableAckCompaction(enableAckCompaction);
717    }
718
719    /**
720     * Whether non-blocking subscription statistics have been enabled
721     *
722     * @return
723     */
724    public boolean isEnableSubscriptionStatistics() {
725        return letter.isEnableSubscriptionStatistics();
726    }
727
728    /**
729     * Enable caching statistics for each subscription to allow non-blocking
730     * retrieval of metrics.  This could incur some overhead to compute if there are a lot
731     * of subscriptions.
732     *
733     * @param enableSubscriptionStatistics
734     */
735    public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics) {
736        letter.setEnableSubscriptionStatistics(enableSubscriptionStatistics);
737    }
738
739    public KahaDBStore getStore() {
740        return letter;
741    }
742
743    public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
744        if (txid == null) {
745            return null;
746        }
747        KahaTransactionInfo rc = new KahaTransactionInfo();
748
749        if (txid.isLocalTransaction()) {
750            LocalTransactionId t = (LocalTransactionId) txid;
751            KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
752            kahaTxId.setConnectionId(t.getConnectionId().getValue());
753            kahaTxId.setTransactionId(t.getValue());
754            rc.setLocalTransactionId(kahaTxId);
755        } else {
756            XATransactionId t = (XATransactionId) txid;
757            KahaXATransactionId kahaTxId = new KahaXATransactionId();
758            kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
759            kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
760            kahaTxId.setFormatId(t.getFormatId());
761            rc.setXaTransactionId(kahaTxId);
762        }
763        return rc;
764    }
765
766    @Override
767    public Locker createDefaultLocker() throws IOException {
768        SharedFileLocker locker = new SharedFileLocker();
769        locker.configure(this);
770        return locker;
771    }
772
773    @Override
774    public void init() throws Exception {}
775
776    @Override
777    public String toString() {
778        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
779        return "KahaDBPersistenceAdapter[" + path + "]";
780    }
781
782    @Override
783    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
784        getStore().setTransactionIdTransformer(transactionIdTransformer);
785    }
786
787    @Override
788    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
789        return this.letter.createJobSchedulerStore();
790    }
791}