001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import org.apache.activemq.broker.BrokerService;
020 import org.apache.activemq.broker.ConnectionContext;
021 import org.apache.activemq.broker.LockableServiceSupport;
022 import org.apache.activemq.command.ActiveMQDestination;
023 import org.apache.activemq.command.ActiveMQQueue;
024 import org.apache.activemq.command.ActiveMQTopic;
025 import org.apache.activemq.command.LocalTransactionId;
026 import org.apache.activemq.command.ProducerId;
027 import org.apache.activemq.command.TransactionId;
028 import org.apache.activemq.command.XATransactionId;
029 import org.apache.activemq.protobuf.Buffer;
030 import org.apache.activemq.broker.Locker;
031 import org.apache.activemq.store.MessageStore;
032 import org.apache.activemq.store.PersistenceAdapter;
033 import org.apache.activemq.store.SharedFileLocker;
034 import org.apache.activemq.store.TopicMessageStore;
035 import org.apache.activemq.store.TransactionStore;
036 import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
037 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
038 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
039 import org.apache.activemq.usage.SystemUsage;
040 import org.apache.activemq.util.ServiceStopper;
041
042 import java.io.File;
043 import java.io.IOException;
044 import java.util.Set;
045
046 /**
047 * An implementation of {@link PersistenceAdapter} designed for use with
048 * KahaDB - Embedded Lightweight Non-Relational Database
049 *
050 * @org.apache.xbean.XBean element="kahaDB"
051 *
052 */
053 public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter {
054 private final KahaDBStore letter = new KahaDBStore();
055
056 /**
057 * @param context
058 * @throws IOException
059 * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
060 */
061 public void beginTransaction(ConnectionContext context) throws IOException {
062 this.letter.beginTransaction(context);
063 }
064
065 /**
066 * @param sync
067 * @throws IOException
068 * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
069 */
070 public void checkpoint(boolean sync) throws IOException {
071 this.letter.checkpoint(sync);
072 }
073
074 /**
075 * @param context
076 * @throws IOException
077 * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
078 */
079 public void commitTransaction(ConnectionContext context) throws IOException {
080 this.letter.commitTransaction(context);
081 }
082
083 /**
084 * @param destination
085 * @return MessageStore
086 * @throws IOException
087 * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
088 */
089 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
090 return this.letter.createQueueMessageStore(destination);
091 }
092
093 /**
094 * @param destination
095 * @return TopicMessageStore
096 * @throws IOException
097 * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
098 */
099 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
100 return this.letter.createTopicMessageStore(destination);
101 }
102
103 /**
104 * @return TransactionStore
105 * @throws IOException
106 * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
107 */
108 public TransactionStore createTransactionStore() throws IOException {
109 return this.letter.createTransactionStore();
110 }
111
112 /**
113 * @throws IOException
114 * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
115 */
116 public void deleteAllMessages() throws IOException {
117 this.letter.deleteAllMessages();
118 }
119
120 /**
121 * @return destinations
122 * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
123 */
124 public Set<ActiveMQDestination> getDestinations() {
125 return this.letter.getDestinations();
126 }
127
128 /**
129 * @return lastMessageBrokerSequenceId
130 * @throws IOException
131 * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
132 */
133 public long getLastMessageBrokerSequenceId() throws IOException {
134 return this.letter.getLastMessageBrokerSequenceId();
135 }
136
137 public long getLastProducerSequenceId(ProducerId id) throws IOException {
138 return this.letter.getLastProducerSequenceId(id);
139 }
140
141 /**
142 * @param destination
143 * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
144 */
145 public void removeQueueMessageStore(ActiveMQQueue destination) {
146 this.letter.removeQueueMessageStore(destination);
147 }
148
149 /**
150 * @param destination
151 * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
152 */
153 public void removeTopicMessageStore(ActiveMQTopic destination) {
154 this.letter.removeTopicMessageStore(destination);
155 }
156
157 /**
158 * @param context
159 * @throws IOException
160 * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
161 */
162 public void rollbackTransaction(ConnectionContext context) throws IOException {
163 this.letter.rollbackTransaction(context);
164 }
165
166 /**
167 * @param brokerName
168 * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
169 */
170 public void setBrokerName(String brokerName) {
171 this.letter.setBrokerName(brokerName);
172 }
173
174 /**
175 * @param usageManager
176 * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
177 */
178 public void setUsageManager(SystemUsage usageManager) {
179 this.letter.setUsageManager(usageManager);
180 }
181
182 /**
183 * @return the size of the store
184 * @see org.apache.activemq.store.PersistenceAdapter#size()
185 */
186 public long size() {
187 return this.letter.size();
188 }
189
190 /**
191 * @throws Exception
192 * @see org.apache.activemq.Service#start()
193 */
194 public void doStart() throws Exception {
195 this.letter.start();
196 }
197
198 /**
199 * @throws Exception
200 * @see org.apache.activemq.Service#stop()
201 */
202 public void doStop(ServiceStopper stopper) throws Exception {
203 this.letter.stop();
204 }
205
206 /**
207 * Get the journalMaxFileLength
208 *
209 * @return the journalMaxFileLength
210 */
211 public int getJournalMaxFileLength() {
212 return this.letter.getJournalMaxFileLength();
213 }
214
215 /**
216 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
217 * be used
218 *
219 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
220 */
221 public void setJournalMaxFileLength(int journalMaxFileLength) {
222 this.letter.setJournalMaxFileLength(journalMaxFileLength);
223 }
224
225 /**
226 * Set the max number of producers (LRU cache) to track for duplicate sends
227 */
228 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
229 this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
230 }
231
232 public int getMaxFailoverProducersToTrack() {
233 return this.letter.getMaxFailoverProducersToTrack();
234 }
235
236 /**
237 * set the audit window depth for duplicate suppression (should exceed the max transaction
238 * batch)
239 */
240 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
241 this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
242 }
243
244 public int getFailoverProducersAuditDepth() {
245 return this.letter.getFailoverProducersAuditDepth();
246 }
247
248 /**
249 * Get the checkpointInterval
250 *
251 * @return the checkpointInterval
252 */
253 public long getCheckpointInterval() {
254 return this.letter.getCheckpointInterval();
255 }
256
257 /**
258 * Set the checkpointInterval
259 *
260 * @param checkpointInterval
261 * the checkpointInterval to set
262 */
263 public void setCheckpointInterval(long checkpointInterval) {
264 this.letter.setCheckpointInterval(checkpointInterval);
265 }
266
267 /**
268 * Get the cleanupInterval
269 *
270 * @return the cleanupInterval
271 */
272 public long getCleanupInterval() {
273 return this.letter.getCleanupInterval();
274 }
275
276 /**
277 * Set the cleanupInterval
278 *
279 * @param cleanupInterval
280 * the cleanupInterval to set
281 */
282 public void setCleanupInterval(long cleanupInterval) {
283 this.letter.setCleanupInterval(cleanupInterval);
284 }
285
286 /**
287 * Get the indexWriteBatchSize
288 *
289 * @return the indexWriteBatchSize
290 */
291 public int getIndexWriteBatchSize() {
292 return this.letter.getIndexWriteBatchSize();
293 }
294
295 /**
296 * Set the indexWriteBatchSize
297 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
298 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
299 * @param indexWriteBatchSize
300 * the indexWriteBatchSize to set
301 */
302 public void setIndexWriteBatchSize(int indexWriteBatchSize) {
303 this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
304 }
305
306 /**
307 * Get the journalMaxWriteBatchSize
308 *
309 * @return the journalMaxWriteBatchSize
310 */
311 public int getJournalMaxWriteBatchSize() {
312 return this.letter.getJournalMaxWriteBatchSize();
313 }
314
315 /**
316 * Set the journalMaxWriteBatchSize
317 * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
318 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
319 * @param journalMaxWriteBatchSize
320 * the journalMaxWriteBatchSize to set
321 */
322 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
323 this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
324 }
325
326 /**
327 * Get the enableIndexWriteAsync
328 *
329 * @return the enableIndexWriteAsync
330 */
331 public boolean isEnableIndexWriteAsync() {
332 return this.letter.isEnableIndexWriteAsync();
333 }
334
335 /**
336 * Set the enableIndexWriteAsync
337 *
338 * @param enableIndexWriteAsync
339 * the enableIndexWriteAsync to set
340 */
341 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
342 this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
343 }
344
345 /**
346 * Get the directory
347 *
348 * @return the directory
349 */
350 public File getDirectory() {
351 return this.letter.getDirectory();
352 }
353
354 /**
355 * @param dir
356 * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
357 */
358 public void setDirectory(File dir) {
359 this.letter.setDirectory(dir);
360 }
361
362 /**
363 * Get the enableJournalDiskSyncs
364 *
365 * @return the enableJournalDiskSyncs
366 */
367 public boolean isEnableJournalDiskSyncs() {
368 return this.letter.isEnableJournalDiskSyncs();
369 }
370
371 /**
372 * Set the enableJournalDiskSyncs
373 *
374 * @param enableJournalDiskSyncs
375 * the enableJournalDiskSyncs to set
376 */
377 public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
378 this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
379 }
380
381 /**
382 * Get the indexCacheSize
383 *
384 * @return the indexCacheSize
385 */
386 public int getIndexCacheSize() {
387 return this.letter.getIndexCacheSize();
388 }
389
390 /**
391 * Set the indexCacheSize
392 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
393 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
394 * @param indexCacheSize
395 * the indexCacheSize to set
396 */
397 public void setIndexCacheSize(int indexCacheSize) {
398 this.letter.setIndexCacheSize(indexCacheSize);
399 }
400
401 /**
402 * Get the ignoreMissingJournalfiles
403 *
404 * @return the ignoreMissingJournalfiles
405 */
406 public boolean isIgnoreMissingJournalfiles() {
407 return this.letter.isIgnoreMissingJournalfiles();
408 }
409
410 /**
411 * Set the ignoreMissingJournalfiles
412 *
413 * @param ignoreMissingJournalfiles
414 * the ignoreMissingJournalfiles to set
415 */
416 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
417 this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
418 }
419
420 public boolean isChecksumJournalFiles() {
421 return letter.isChecksumJournalFiles();
422 }
423
424 public boolean isCheckForCorruptJournalFiles() {
425 return letter.isCheckForCorruptJournalFiles();
426 }
427
428 public void setChecksumJournalFiles(boolean checksumJournalFiles) {
429 letter.setChecksumJournalFiles(checksumJournalFiles);
430 }
431
432 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
433 letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
434 }
435
436 public void setBrokerService(BrokerService brokerService) {
437 letter.setBrokerService(brokerService);
438 }
439
440 public boolean isArchiveDataLogs() {
441 return letter.isArchiveDataLogs();
442 }
443
444 public void setArchiveDataLogs(boolean archiveDataLogs) {
445 letter.setArchiveDataLogs(archiveDataLogs);
446 }
447
448 public File getDirectoryArchive() {
449 return letter.getDirectoryArchive();
450 }
451
452 public void setDirectoryArchive(File directoryArchive) {
453 letter.setDirectoryArchive(directoryArchive);
454 }
455
456 public boolean isConcurrentStoreAndDispatchQueues() {
457 return letter.isConcurrentStoreAndDispatchQueues();
458 }
459
460 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
461 letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
462 }
463
464 public boolean isConcurrentStoreAndDispatchTopics() {
465 return letter.isConcurrentStoreAndDispatchTopics();
466 }
467
468 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
469 letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
470 }
471
472 public int getMaxAsyncJobs() {
473 return letter.getMaxAsyncJobs();
474 }
475 /**
476 * @param maxAsyncJobs
477 * the maxAsyncJobs to set
478 */
479 public void setMaxAsyncJobs(int maxAsyncJobs) {
480 letter.setMaxAsyncJobs(maxAsyncJobs);
481 }
482
483 /**
484 * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
485 *
486 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
487 */
488 @Deprecated
489 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException {
490 getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay);
491 }
492
493 public boolean getForceRecoverIndex() {
494 return letter.getForceRecoverIndex();
495 }
496
497 public void setForceRecoverIndex(boolean forceRecoverIndex) {
498 letter.setForceRecoverIndex(forceRecoverIndex);
499 }
500
501 public boolean isArchiveCorruptedIndex() {
502 return letter.isArchiveCorruptedIndex();
503 }
504
505 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
506 letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
507 }
508
509 /**
510 * When true, persist the redelivery status such that the message redelivery flag can survive a broker failure
511 * used with org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean) true
512 */
513 public void setRewriteOnRedelivery(boolean rewriteOnRedelivery) {
514 letter.setRewriteOnRedelivery(rewriteOnRedelivery);
515 }
516
517 public boolean isRewriteOnRedelivery() {
518 return letter.isRewriteOnRedelivery();
519 }
520
521 public float getIndexLFUEvictionFactor() {
522 return letter.getIndexLFUEvictionFactor();
523 }
524
525 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
526 letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor);
527 }
528
529 public boolean isUseIndexLFRUEviction() {
530 return letter.isUseIndexLFRUEviction();
531 }
532
533 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
534 letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
535 }
536
537 public void setEnableIndexDiskSyncs(boolean diskSyncs) {
538 letter.setEnableIndexDiskSyncs(diskSyncs);
539 }
540
541 public boolean isEnableIndexDiskSyncs() {
542 return letter.isEnableIndexDiskSyncs();
543 }
544
545 public void setEnableIndexRecoveryFile(boolean enable) {
546 letter.setEnableIndexRecoveryFile(enable);
547 }
548
549 public boolean isEnableIndexRecoveryFile() {
550 return letter.isEnableIndexRecoveryFile();
551 }
552
553 public void setEnableIndexPageCaching(boolean enable) {
554 letter.setEnableIndexPageCaching(enable);
555 }
556
557 public boolean isEnableIndexPageCaching() {
558 return letter.isEnableIndexPageCaching();
559 }
560
561 public KahaDBStore getStore() {
562 return letter;
563 }
564
565 public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
566 if (txid == null) {
567 return null;
568 }
569 KahaTransactionInfo rc = new KahaTransactionInfo();
570
571 if (txid.isLocalTransaction()) {
572 LocalTransactionId t = (LocalTransactionId) txid;
573 KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
574 kahaTxId.setConnectionId(t.getConnectionId().getValue());
575 kahaTxId.setTransactionId(t.getValue());
576 rc.setLocalTransactionId(kahaTxId);
577 } else {
578 XATransactionId t = (XATransactionId) txid;
579 KahaXATransactionId kahaTxId = new KahaXATransactionId();
580 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
581 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
582 kahaTxId.setFormatId(t.getFormatId());
583 rc.setXaTransactionId(kahaTxId);
584 }
585 return rc;
586 }
587
588 public Locker createDefaultLocker() throws IOException {
589 SharedFileLocker locker = new SharedFileLocker();
590 locker.configure(this);
591 return locker;
592 }
593
594 @Override
595 public void init() throws Exception {}
596
597 @Override
598 public String toString() {
599 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
600 return "KahaDBPersistenceAdapter[" + path + "]";
601 }
602
603 }