001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import org.apache.activemq.broker.BrokerService;
020    import org.apache.activemq.broker.ConnectionContext;
021    import org.apache.activemq.broker.LockableServiceSupport;
022    import org.apache.activemq.command.ActiveMQDestination;
023    import org.apache.activemq.command.ActiveMQQueue;
024    import org.apache.activemq.command.ActiveMQTopic;
025    import org.apache.activemq.command.LocalTransactionId;
026    import org.apache.activemq.command.ProducerId;
027    import org.apache.activemq.command.TransactionId;
028    import org.apache.activemq.command.XATransactionId;
029    import org.apache.activemq.protobuf.Buffer;
030    import org.apache.activemq.broker.Locker;
031    import org.apache.activemq.store.MessageStore;
032    import org.apache.activemq.store.PersistenceAdapter;
033    import org.apache.activemq.store.SharedFileLocker;
034    import org.apache.activemq.store.TopicMessageStore;
035    import org.apache.activemq.store.TransactionStore;
036    import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
037    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
038    import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
039    import org.apache.activemq.usage.SystemUsage;
040    import org.apache.activemq.util.ServiceStopper;
041    
042    import java.io.File;
043    import java.io.IOException;
044    import java.util.Set;
045    
046    /**
047     * An implementation of {@link PersistenceAdapter} designed for use with
048     * KahaDB - Embedded Lightweight Non-Relational Database
049     *
050     * @org.apache.xbean.XBean element="kahaDB"
051     *
052     */
053    public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter {
054        private final KahaDBStore letter = new KahaDBStore();
055    
056        /**
057         * @param context
058         * @throws IOException
059         * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
060         */
061        public void beginTransaction(ConnectionContext context) throws IOException {
062            this.letter.beginTransaction(context);
063        }
064    
065        /**
066         * @param sync
067         * @throws IOException
068         * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
069         */
070        public void checkpoint(boolean sync) throws IOException {
071            this.letter.checkpoint(sync);
072        }
073    
074        /**
075         * @param context
076         * @throws IOException
077         * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
078         */
079        public void commitTransaction(ConnectionContext context) throws IOException {
080            this.letter.commitTransaction(context);
081        }
082    
083        /**
084         * @param destination
085         * @return MessageStore
086         * @throws IOException
087         * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
088         */
089        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
090            return this.letter.createQueueMessageStore(destination);
091        }
092    
093        /**
094         * @param destination
095         * @return TopicMessageStore
096         * @throws IOException
097         * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
098         */
099        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
100            return this.letter.createTopicMessageStore(destination);
101        }
102    
103        /**
104         * @return TransactionStore
105         * @throws IOException
106         * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
107         */
108        public TransactionStore createTransactionStore() throws IOException {
109            return this.letter.createTransactionStore();
110        }
111    
112        /**
113         * @throws IOException
114         * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
115         */
116        public void deleteAllMessages() throws IOException {
117            this.letter.deleteAllMessages();
118        }
119    
120        /**
121         * @return destinations
122         * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
123         */
124        public Set<ActiveMQDestination> getDestinations() {
125            return this.letter.getDestinations();
126        }
127    
128        /**
129         * @return lastMessageBrokerSequenceId
130         * @throws IOException
131         * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
132         */
133        public long getLastMessageBrokerSequenceId() throws IOException {
134            return this.letter.getLastMessageBrokerSequenceId();
135        }
136    
137        public long getLastProducerSequenceId(ProducerId id) throws IOException {
138            return this.letter.getLastProducerSequenceId(id);
139        }
140    
141        /**
142         * @param destination
143         * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
144         */
145        public void removeQueueMessageStore(ActiveMQQueue destination) {
146            this.letter.removeQueueMessageStore(destination);
147        }
148    
149        /**
150         * @param destination
151         * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
152         */
153        public void removeTopicMessageStore(ActiveMQTopic destination) {
154            this.letter.removeTopicMessageStore(destination);
155        }
156    
157        /**
158         * @param context
159         * @throws IOException
160         * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
161         */
162        public void rollbackTransaction(ConnectionContext context) throws IOException {
163            this.letter.rollbackTransaction(context);
164        }
165    
166        /**
167         * @param brokerName
168         * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
169         */
170        public void setBrokerName(String brokerName) {
171            this.letter.setBrokerName(brokerName);
172        }
173    
174        /**
175         * @param usageManager
176         * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
177         */
178        public void setUsageManager(SystemUsage usageManager) {
179            this.letter.setUsageManager(usageManager);
180        }
181    
182        /**
183         * @return the size of the store
184         * @see org.apache.activemq.store.PersistenceAdapter#size()
185         */
186        public long size() {
187            return this.letter.size();
188        }
189    
190        /**
191         * @throws Exception
192         * @see org.apache.activemq.Service#start()
193         */
194        public void doStart() throws Exception {
195            this.letter.start();
196        }
197    
198        /**
199         * @throws Exception
200         * @see org.apache.activemq.Service#stop()
201         */
202        public void doStop(ServiceStopper stopper) throws Exception {
203            this.letter.stop();
204        }
205    
206        /**
207         * Get the journalMaxFileLength
208         *
209         * @return the journalMaxFileLength
210         */
211        public int getJournalMaxFileLength() {
212            return this.letter.getJournalMaxFileLength();
213        }
214    
215        /**
216         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
217         * be used
218         *
219         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
220         */
221        public void setJournalMaxFileLength(int journalMaxFileLength) {
222            this.letter.setJournalMaxFileLength(journalMaxFileLength);
223        }
224    
225        /**
226         * Set the max number of producers (LRU cache) to track for duplicate sends
227         */
228        public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
229            this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
230        }
231    
232        public int getMaxFailoverProducersToTrack() {
233            return this.letter.getMaxFailoverProducersToTrack();
234        }
235    
236        /**
237         * set the audit window depth for duplicate suppression (should exceed the max transaction
238         * batch)
239         */
240        public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
241            this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
242        }
243    
244        public int getFailoverProducersAuditDepth() {
245            return this.letter.getFailoverProducersAuditDepth();
246        }
247    
248        /**
249         * Get the checkpointInterval
250         *
251         * @return the checkpointInterval
252         */
253        public long getCheckpointInterval() {
254            return this.letter.getCheckpointInterval();
255        }
256    
257        /**
258         * Set the checkpointInterval
259         *
260         * @param checkpointInterval
261         *            the checkpointInterval to set
262         */
263        public void setCheckpointInterval(long checkpointInterval) {
264            this.letter.setCheckpointInterval(checkpointInterval);
265        }
266    
267        /**
268         * Get the cleanupInterval
269         *
270         * @return the cleanupInterval
271         */
272        public long getCleanupInterval() {
273            return this.letter.getCleanupInterval();
274        }
275    
276        /**
277         * Set the cleanupInterval
278         *
279         * @param cleanupInterval
280         *            the cleanupInterval to set
281         */
282        public void setCleanupInterval(long cleanupInterval) {
283            this.letter.setCleanupInterval(cleanupInterval);
284        }
285    
286        /**
287         * Get the indexWriteBatchSize
288         *
289         * @return the indexWriteBatchSize
290         */
291        public int getIndexWriteBatchSize() {
292            return this.letter.getIndexWriteBatchSize();
293        }
294    
295        /**
296         * Set the indexWriteBatchSize
297         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
298         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
299         * @param indexWriteBatchSize
300         *            the indexWriteBatchSize to set
301         */
302        public void setIndexWriteBatchSize(int indexWriteBatchSize) {
303            this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
304        }
305    
306        /**
307         * Get the journalMaxWriteBatchSize
308         *
309         * @return the journalMaxWriteBatchSize
310         */
311        public int getJournalMaxWriteBatchSize() {
312            return this.letter.getJournalMaxWriteBatchSize();
313        }
314    
315        /**
316         * Set the journalMaxWriteBatchSize
317         *  * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
318         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
319         * @param journalMaxWriteBatchSize
320         *            the journalMaxWriteBatchSize to set
321         */
322        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
323            this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
324        }
325    
326        /**
327         * Get the enableIndexWriteAsync
328         *
329         * @return the enableIndexWriteAsync
330         */
331        public boolean isEnableIndexWriteAsync() {
332            return this.letter.isEnableIndexWriteAsync();
333        }
334    
335        /**
336         * Set the enableIndexWriteAsync
337         *
338         * @param enableIndexWriteAsync
339         *            the enableIndexWriteAsync to set
340         */
341        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
342            this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
343        }
344    
345        /**
346         * Get the directory
347         *
348         * @return the directory
349         */
350        public File getDirectory() {
351            return this.letter.getDirectory();
352        }
353    
354        /**
355         * @param dir
356         * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
357         */
358        public void setDirectory(File dir) {
359            this.letter.setDirectory(dir);
360        }
361    
362        /**
363         * Get the enableJournalDiskSyncs
364         *
365         * @return the enableJournalDiskSyncs
366         */
367        public boolean isEnableJournalDiskSyncs() {
368            return this.letter.isEnableJournalDiskSyncs();
369        }
370    
371        /**
372         * Set the enableJournalDiskSyncs
373         *
374         * @param enableJournalDiskSyncs
375         *            the enableJournalDiskSyncs to set
376         */
377        public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
378            this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
379        }
380    
381        /**
382         * Get the indexCacheSize
383         *
384         * @return the indexCacheSize
385         */
386        public int getIndexCacheSize() {
387            return this.letter.getIndexCacheSize();
388        }
389    
390        /**
391         * Set the indexCacheSize
392         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
393         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
394         * @param indexCacheSize
395         *            the indexCacheSize to set
396         */
397        public void setIndexCacheSize(int indexCacheSize) {
398            this.letter.setIndexCacheSize(indexCacheSize);
399        }
400    
401        /**
402         * Get the ignoreMissingJournalfiles
403         *
404         * @return the ignoreMissingJournalfiles
405         */
406        public boolean isIgnoreMissingJournalfiles() {
407            return this.letter.isIgnoreMissingJournalfiles();
408        }
409    
410        /**
411         * Set the ignoreMissingJournalfiles
412         *
413         * @param ignoreMissingJournalfiles
414         *            the ignoreMissingJournalfiles to set
415         */
416        public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
417            this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
418        }
419    
420        public boolean isChecksumJournalFiles() {
421            return letter.isChecksumJournalFiles();
422        }
423    
424        public boolean isCheckForCorruptJournalFiles() {
425            return letter.isCheckForCorruptJournalFiles();
426        }
427    
428        public void setChecksumJournalFiles(boolean checksumJournalFiles) {
429            letter.setChecksumJournalFiles(checksumJournalFiles);
430        }
431    
432        public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
433            letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
434        }
435    
436        public void setBrokerService(BrokerService brokerService) {
437            letter.setBrokerService(brokerService);
438        }
439    
440        public boolean isArchiveDataLogs() {
441            return letter.isArchiveDataLogs();
442        }
443    
444        public void setArchiveDataLogs(boolean archiveDataLogs) {
445            letter.setArchiveDataLogs(archiveDataLogs);
446        }
447    
448        public File getDirectoryArchive() {
449            return letter.getDirectoryArchive();
450        }
451    
452        public void setDirectoryArchive(File directoryArchive) {
453            letter.setDirectoryArchive(directoryArchive);
454        }
455    
456        public boolean isConcurrentStoreAndDispatchQueues() {
457            return letter.isConcurrentStoreAndDispatchQueues();
458        }
459    
460        public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
461            letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
462        }
463    
464        public boolean isConcurrentStoreAndDispatchTopics() {
465            return letter.isConcurrentStoreAndDispatchTopics();
466        }
467    
468        public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
469            letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
470        }
471    
472        public int getMaxAsyncJobs() {
473            return letter.getMaxAsyncJobs();
474        }
475        /**
476         * @param maxAsyncJobs
477         *            the maxAsyncJobs to set
478         */
479        public void setMaxAsyncJobs(int maxAsyncJobs) {
480            letter.setMaxAsyncJobs(maxAsyncJobs);
481        }
482    
483        /**
484         * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
485         *
486         * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
487         */
488        @Deprecated
489        public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException {
490           getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay);
491        }
492    
493        public boolean getForceRecoverIndex() {
494            return letter.getForceRecoverIndex();
495        }
496    
497        public void setForceRecoverIndex(boolean forceRecoverIndex) {
498            letter.setForceRecoverIndex(forceRecoverIndex);
499        }
500    
501        public boolean isArchiveCorruptedIndex() {
502            return letter.isArchiveCorruptedIndex();
503        }
504    
505        public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
506            letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
507        }
508    
509        /**
510         * When true, persist the redelivery status such that the message redelivery flag can survive a broker failure
511         * used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean)  true
512         */
513        public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
514            letter.setRewriteOnRedelivery(rewriteOnRedelivery);
515        }
516    
517        public boolean isRewriteOnRedelivery() {
518            return letter.isRewriteOnRedelivery();
519        }
520    
521        public float getIndexLFUEvictionFactor() {
522            return letter.getIndexLFUEvictionFactor();
523        }
524    
525        public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
526            letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor);
527        }
528    
529        public boolean isUseIndexLFRUEviction() {
530            return letter.isUseIndexLFRUEviction();
531        }
532    
533        public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
534            letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
535        }
536    
537        public void setEnableIndexDiskSyncs(boolean diskSyncs) {
538            letter.setEnableIndexDiskSyncs(diskSyncs);
539        }
540    
541        public boolean isEnableIndexDiskSyncs() {
542            return letter.isEnableIndexDiskSyncs();
543        }
544    
545        public void setEnableIndexRecoveryFile(boolean enable) {
546            letter.setEnableIndexRecoveryFile(enable);
547        }
548    
549        public boolean  isEnableIndexRecoveryFile() {
550            return letter.isEnableIndexRecoveryFile();
551        }
552    
553        public void setEnableIndexPageCaching(boolean enable) {
554            letter.setEnableIndexPageCaching(enable);
555        }
556    
557        public boolean isEnableIndexPageCaching() {
558            return letter.isEnableIndexPageCaching();
559        }
560    
561        public KahaDBStore getStore() {
562            return letter;
563        }
564    
565        public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
566            if (txid == null) {
567                return null;
568            }
569            KahaTransactionInfo rc = new KahaTransactionInfo();
570    
571            if (txid.isLocalTransaction()) {
572                LocalTransactionId t = (LocalTransactionId) txid;
573                KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
574                kahaTxId.setConnectionId(t.getConnectionId().getValue());
575                kahaTxId.setTransactionId(t.getValue());
576                rc.setLocalTransactionId(kahaTxId);
577            } else {
578                XATransactionId t = (XATransactionId) txid;
579                KahaXATransactionId kahaTxId = new KahaXATransactionId();
580                kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
581                kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
582                kahaTxId.setFormatId(t.getFormatId());
583                rc.setXaTransactionId(kahaTxId);
584            }
585            return rc;
586        }
587    
588        public Locker createDefaultLocker() throws IOException {
589            SharedFileLocker locker = new SharedFileLocker();
590            locker.configure(this);
591            return locker;
592        }
593    
594        @Override
595        public void init() throws Exception {}
596    
597        @Override
598        public String toString() {
599            String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
600            return "KahaDBPersistenceAdapter[" + path + "]";
601        }
602    
603    }