001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName;
020    
021    import java.io.File;
022    import java.io.IOException;
023    import java.util.Set;
024    import java.util.concurrent.Callable;
025    
026    import javax.management.ObjectName;
027    
028    import org.apache.activemq.broker.BrokerService;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.broker.LockableServiceSupport;
031    import org.apache.activemq.broker.Locker;
032    import org.apache.activemq.broker.jmx.AnnotatedMBean;
033    import org.apache.activemq.broker.jmx.PersistenceAdapterView;
034    import org.apache.activemq.command.ActiveMQDestination;
035    import org.apache.activemq.command.ActiveMQQueue;
036    import org.apache.activemq.command.ActiveMQTopic;
037    import org.apache.activemq.command.LocalTransactionId;
038    import org.apache.activemq.command.ProducerId;
039    import org.apache.activemq.command.TransactionId;
040    import org.apache.activemq.command.XATransactionId;
041    import org.apache.activemq.protobuf.Buffer;
042    import org.apache.activemq.store.JournaledStore;
043    import org.apache.activemq.store.MessageStore;
044    import org.apache.activemq.store.PersistenceAdapter;
045    import org.apache.activemq.store.SharedFileLocker;
046    import org.apache.activemq.store.TopicMessageStore;
047    import org.apache.activemq.store.TransactionStore;
048    import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
049    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
050    import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
051    import org.apache.activemq.usage.SystemUsage;
052    import org.apache.activemq.util.ServiceStopper;
053    
054    /**
055     * An implementation of {@link PersistenceAdapter} designed for use with
056     * KahaDB - Embedded Lightweight Non-Relational Database
057     *
058     * @org.apache.xbean.XBean element="kahaDB"
059     *
060     */
061    public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, JournaledStore {
062        private final KahaDBStore letter = new KahaDBStore();
063    
064        /**
065         * @param context
066         * @throws IOException
067         * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
068         */
069        @Override
070        public void beginTransaction(ConnectionContext context) throws IOException {
071            this.letter.beginTransaction(context);
072        }
073    
074        /**
075         * @param sync
076         * @throws IOException
077         * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
078         */
079        @Override
080        public void checkpoint(boolean sync) throws IOException {
081            this.letter.checkpoint(sync);
082        }
083    
084        /**
085         * @param context
086         * @throws IOException
087         * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
088         */
089        @Override
090        public void commitTransaction(ConnectionContext context) throws IOException {
091            this.letter.commitTransaction(context);
092        }
093    
094        /**
095         * @param destination
096         * @return MessageStore
097         * @throws IOException
098         * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
099         */
100        @Override
101        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
102            return this.letter.createQueueMessageStore(destination);
103        }
104    
105        /**
106         * @param destination
107         * @return TopicMessageStore
108         * @throws IOException
109         * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
110         */
111        @Override
112        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
113            return this.letter.createTopicMessageStore(destination);
114        }
115    
116        /**
117         * @return TransactionStore
118         * @throws IOException
119         * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
120         */
121        @Override
122        public TransactionStore createTransactionStore() throws IOException {
123            return this.letter.createTransactionStore();
124        }
125    
126        /**
127         * @throws IOException
128         * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
129         */
130        @Override
131        public void deleteAllMessages() throws IOException {
132            this.letter.deleteAllMessages();
133        }
134    
135        /**
136         * @return destinations
137         * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
138         */
139        @Override
140        public Set<ActiveMQDestination> getDestinations() {
141            return this.letter.getDestinations();
142        }
143    
144        /**
145         * @return lastMessageBrokerSequenceId
146         * @throws IOException
147         * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
148         */
149        @Override
150        public long getLastMessageBrokerSequenceId() throws IOException {
151            return this.letter.getLastMessageBrokerSequenceId();
152        }
153    
154        @Override
155        public long getLastProducerSequenceId(ProducerId id) throws IOException {
156            return this.letter.getLastProducerSequenceId(id);
157        }
158    
159        /**
160         * @param destination
161         * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
162         */
163        @Override
164        public void removeQueueMessageStore(ActiveMQQueue destination) {
165            this.letter.removeQueueMessageStore(destination);
166        }
167    
168        /**
169         * @param destination
170         * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
171         */
172        @Override
173        public void removeTopicMessageStore(ActiveMQTopic destination) {
174            this.letter.removeTopicMessageStore(destination);
175        }
176    
177        /**
178         * @param context
179         * @throws IOException
180         * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
181         */
182        @Override
183        public void rollbackTransaction(ConnectionContext context) throws IOException {
184            this.letter.rollbackTransaction(context);
185        }
186    
187        /**
188         * @param brokerName
189         * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
190         */
191        @Override
192        public void setBrokerName(String brokerName) {
193            this.letter.setBrokerName(brokerName);
194        }
195    
196        /**
197         * @param usageManager
198         * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
199         */
200        @Override
201        public void setUsageManager(SystemUsage usageManager) {
202            this.letter.setUsageManager(usageManager);
203        }
204    
205        /**
206         * @return the size of the store
207         * @see org.apache.activemq.store.PersistenceAdapter#size()
208         */
209        @Override
210        public long size() {
211            return this.letter.size();
212        }
213    
214        /**
215         * @throws Exception
216         * @see org.apache.activemq.Service#start()
217         */
218        @Override
219        public void doStart() throws Exception {
220            this.letter.start();
221    
222            if (brokerService != null && brokerService.isUseJmx()) {
223                PersistenceAdapterView view = new PersistenceAdapterView(this);
224                view.setInflightTransactionViewCallable(new Callable<String>() {
225                    @Override
226                    public String call() throws Exception {
227                        return letter.getTransactions();
228                    }
229                });
230                view.setDataViewCallable(new Callable<String>() {
231                    @Override
232                    public String call() throws Exception {
233                        return letter.getJournal().getFileMap().keySet().toString();
234                    }
235                });
236                AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view,
237                        createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString()));
238            }
239        }
240    
241        /**
242         * @throws Exception
243         * @see org.apache.activemq.Service#stop()
244         */
245        @Override
246        public void doStop(ServiceStopper stopper) throws Exception {
247            this.letter.stop();
248    
249            if (brokerService != null && brokerService.isUseJmx()) {
250                ObjectName brokerObjectName = brokerService.getBrokerObjectName();
251                brokerService.getManagementContext().unregisterMBean(createPersistenceAdapterName(brokerObjectName.toString(), toString()));
252            }
253        }
254    
255        /**
256         * Get the journalMaxFileLength
257         *
258         * @return the journalMaxFileLength
259         */
260        @Override
261        public int getJournalMaxFileLength() {
262            return this.letter.getJournalMaxFileLength();
263        }
264    
265        /**
266         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
267         * be used
268         *
269         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
270         */
271        public void setJournalMaxFileLength(int journalMaxFileLength) {
272            this.letter.setJournalMaxFileLength(journalMaxFileLength);
273        }
274    
275        /**
276         * Set the max number of producers (LRU cache) to track for duplicate sends
277         */
278        public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
279            this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
280        }
281    
282        public int getMaxFailoverProducersToTrack() {
283            return this.letter.getMaxFailoverProducersToTrack();
284        }
285    
286        /**
287         * set the audit window depth for duplicate suppression (should exceed the max transaction
288         * batch)
289         */
290        public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
291            this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
292        }
293    
294        public int getFailoverProducersAuditDepth() {
295            return this.letter.getFailoverProducersAuditDepth();
296        }
297    
298        /**
299         * Get the checkpointInterval
300         *
301         * @return the checkpointInterval
302         */
303        public long getCheckpointInterval() {
304            return this.letter.getCheckpointInterval();
305        }
306    
307        /**
308         * Set the checkpointInterval
309         *
310         * @param checkpointInterval
311         *            the checkpointInterval to set
312         */
313        public void setCheckpointInterval(long checkpointInterval) {
314            this.letter.setCheckpointInterval(checkpointInterval);
315        }
316    
317        /**
318         * Get the cleanupInterval
319         *
320         * @return the cleanupInterval
321         */
322        public long getCleanupInterval() {
323            return this.letter.getCleanupInterval();
324        }
325    
326        /**
327         * Set the cleanupInterval
328         *
329         * @param cleanupInterval
330         *            the cleanupInterval to set
331         */
332        public void setCleanupInterval(long cleanupInterval) {
333            this.letter.setCleanupInterval(cleanupInterval);
334        }
335    
336        /**
337         * Get the indexWriteBatchSize
338         *
339         * @return the indexWriteBatchSize
340         */
341        public int getIndexWriteBatchSize() {
342            return this.letter.getIndexWriteBatchSize();
343        }
344    
345        /**
346         * Set the indexWriteBatchSize
347         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
348         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
349         * @param indexWriteBatchSize
350         *            the indexWriteBatchSize to set
351         */
352        public void setIndexWriteBatchSize(int indexWriteBatchSize) {
353            this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
354        }
355    
356        /**
357         * Get the journalMaxWriteBatchSize
358         *
359         * @return the journalMaxWriteBatchSize
360         */
361        public int getJournalMaxWriteBatchSize() {
362            return this.letter.getJournalMaxWriteBatchSize();
363        }
364    
365        /**
366         * Set the journalMaxWriteBatchSize
367         *  * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
368         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
369         * @param journalMaxWriteBatchSize
370         *            the journalMaxWriteBatchSize to set
371         */
372        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
373            this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
374        }
375    
376        /**
377         * Get the enableIndexWriteAsync
378         *
379         * @return the enableIndexWriteAsync
380         */
381        public boolean isEnableIndexWriteAsync() {
382            return this.letter.isEnableIndexWriteAsync();
383        }
384    
385        /**
386         * Set the enableIndexWriteAsync
387         *
388         * @param enableIndexWriteAsync
389         *            the enableIndexWriteAsync to set
390         */
391        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
392            this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
393        }
394    
395        /**
396         * Get the directory
397         *
398         * @return the directory
399         */
400        @Override
401        public File getDirectory() {
402            return this.letter.getDirectory();
403        }
404    
405        /**
406         * @param dir
407         * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
408         */
409        @Override
410        public void setDirectory(File dir) {
411            this.letter.setDirectory(dir);
412        }
413    
414        /**
415         * Get the enableJournalDiskSyncs
416         *
417         * @return the enableJournalDiskSyncs
418         */
419        public boolean isEnableJournalDiskSyncs() {
420            return this.letter.isEnableJournalDiskSyncs();
421        }
422    
423        /**
424         * Set the enableJournalDiskSyncs
425         *
426         * @param enableJournalDiskSyncs
427         *            the enableJournalDiskSyncs to set
428         */
429        public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
430            this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
431        }
432    
433        /**
434         * Get the indexCacheSize
435         *
436         * @return the indexCacheSize
437         */
438        public int getIndexCacheSize() {
439            return this.letter.getIndexCacheSize();
440        }
441    
442        /**
443         * Set the indexCacheSize
444         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
445         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
446         * @param indexCacheSize
447         *            the indexCacheSize to set
448         */
449        public void setIndexCacheSize(int indexCacheSize) {
450            this.letter.setIndexCacheSize(indexCacheSize);
451        }
452    
453        /**
454         * Get the ignoreMissingJournalfiles
455         *
456         * @return the ignoreMissingJournalfiles
457         */
458        public boolean isIgnoreMissingJournalfiles() {
459            return this.letter.isIgnoreMissingJournalfiles();
460        }
461    
462        /**
463         * Set the ignoreMissingJournalfiles
464         *
465         * @param ignoreMissingJournalfiles
466         *            the ignoreMissingJournalfiles to set
467         */
468        public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
469            this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
470        }
471    
472        public boolean isChecksumJournalFiles() {
473            return letter.isChecksumJournalFiles();
474        }
475    
476        public boolean isCheckForCorruptJournalFiles() {
477            return letter.isCheckForCorruptJournalFiles();
478        }
479    
480        public void setChecksumJournalFiles(boolean checksumJournalFiles) {
481            letter.setChecksumJournalFiles(checksumJournalFiles);
482        }
483    
484        public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
485            letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
486        }
487    
488        @Override
489        public void setBrokerService(BrokerService brokerService) {
490            super.setBrokerService(brokerService);
491            letter.setBrokerService(brokerService);
492        }
493    
494        public boolean isArchiveDataLogs() {
495            return letter.isArchiveDataLogs();
496        }
497    
498        public void setArchiveDataLogs(boolean archiveDataLogs) {
499            letter.setArchiveDataLogs(archiveDataLogs);
500        }
501    
502        public File getDirectoryArchive() {
503            return letter.getDirectoryArchive();
504        }
505    
506        public void setDirectoryArchive(File directoryArchive) {
507            letter.setDirectoryArchive(directoryArchive);
508        }
509    
510        public boolean isConcurrentStoreAndDispatchQueues() {
511            return letter.isConcurrentStoreAndDispatchQueues();
512        }
513    
514        public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
515            letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
516        }
517    
518        public boolean isConcurrentStoreAndDispatchTopics() {
519            return letter.isConcurrentStoreAndDispatchTopics();
520        }
521    
522        public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
523            letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
524        }
525    
526        public int getMaxAsyncJobs() {
527            return letter.getMaxAsyncJobs();
528        }
529        /**
530         * @param maxAsyncJobs
531         *            the maxAsyncJobs to set
532         */
533        public void setMaxAsyncJobs(int maxAsyncJobs) {
534            letter.setMaxAsyncJobs(maxAsyncJobs);
535        }
536    
537        /**
538         * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
539         *
540         * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
541         */
542        @Deprecated
543        public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException {
544           getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay);
545        }
546    
547        public boolean getForceRecoverIndex() {
548            return letter.getForceRecoverIndex();
549        }
550    
551        public void setForceRecoverIndex(boolean forceRecoverIndex) {
552            letter.setForceRecoverIndex(forceRecoverIndex);
553        }
554    
555        public boolean isArchiveCorruptedIndex() {
556            return letter.isArchiveCorruptedIndex();
557        }
558    
559        public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
560            letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
561        }
562    
563        /**
564         * When true, persist the redelivery status such that the message redelivery flag can survive a broker failure
565         * used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean)  true
566         */
567        public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
568            letter.setRewriteOnRedelivery(rewriteOnRedelivery);
569        }
570    
571        public boolean isRewriteOnRedelivery() {
572            return letter.isRewriteOnRedelivery();
573        }
574    
575        public float getIndexLFUEvictionFactor() {
576            return letter.getIndexLFUEvictionFactor();
577        }
578    
579        public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
580            letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor);
581        }
582    
583        public boolean isUseIndexLFRUEviction() {
584            return letter.isUseIndexLFRUEviction();
585        }
586    
587        public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
588            letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
589        }
590    
591        public void setEnableIndexDiskSyncs(boolean diskSyncs) {
592            letter.setEnableIndexDiskSyncs(diskSyncs);
593        }
594    
595        public boolean isEnableIndexDiskSyncs() {
596            return letter.isEnableIndexDiskSyncs();
597        }
598    
599        public void setEnableIndexRecoveryFile(boolean enable) {
600            letter.setEnableIndexRecoveryFile(enable);
601        }
602    
603        public boolean  isEnableIndexRecoveryFile() {
604            return letter.isEnableIndexRecoveryFile();
605        }
606    
607        public void setEnableIndexPageCaching(boolean enable) {
608            letter.setEnableIndexPageCaching(enable);
609        }
610    
611        public boolean isEnableIndexPageCaching() {
612            return letter.isEnableIndexPageCaching();
613        }
614    
615        public KahaDBStore getStore() {
616            return letter;
617        }
618    
619        public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
620            if (txid == null) {
621                return null;
622            }
623            KahaTransactionInfo rc = new KahaTransactionInfo();
624    
625            if (txid.isLocalTransaction()) {
626                LocalTransactionId t = (LocalTransactionId) txid;
627                KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
628                kahaTxId.setConnectionId(t.getConnectionId().getValue());
629                kahaTxId.setTransactionId(t.getValue());
630                rc.setLocalTransactionId(kahaTxId);
631            } else {
632                XATransactionId t = (XATransactionId) txid;
633                KahaXATransactionId kahaTxId = new KahaXATransactionId();
634                kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
635                kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
636                kahaTxId.setFormatId(t.getFormatId());
637                rc.setXaTransactionId(kahaTxId);
638            }
639            return rc;
640        }
641    
642        @Override
643        public Locker createDefaultLocker() throws IOException {
644            SharedFileLocker locker = new SharedFileLocker();
645            locker.configure(this);
646            return locker;
647        }
648    
649        @Override
650        public void init() throws Exception {}
651    
652        @Override
653        public String toString() {
654            String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
655            return "KahaDBPersistenceAdapter[" + path + "]";
656        }
657    
658    }