public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware
Modifier and Type | Class and Description |
---|---|
protected class |
MessageDatabase.LastAckMarshaller |
protected class |
MessageDatabase.MessageKeysMarshaller |
protected class |
MessageDatabase.Metadata |
static class |
MessageDatabase.PurgeRecoveredXATransactionStrategy |
protected class |
MessageDatabase.StoredDestinationMarshaller |
Constructor and Description |
---|
MessageDatabase() |
Modifier and Type | Method and Description |
---|---|
void |
allowIOResumption() |
protected void |
checkpointCleanup(boolean cleanup) |
protected void |
clearStoreStats(KahaDestination kahaDestination)
Clear the counter for the destination, if one exists.
|
void |
close() |
protected abstract void |
configureMetadata() |
protected Journal |
createJournal() |
protected void |
decrementAndSubSizeToStoreStat(KahaDestination kahaDestination,
long size) |
protected void |
decrementAndSubSizeToStoreStat(KahaDestination kahaDestination,
String subKey,
long size) |
protected void |
decrementAndSubSizeToStoreStat(String kahaDestKey,
long size) |
protected void |
decrementAndSubSizeToStoreStat(String kahaDestKey,
String subKey,
long size) |
void |
doStart() |
void |
doStop(ServiceStopper stopper) |
void |
forgetRecoveredAcks(ArrayList<MessageAck> acks,
boolean rollback) |
long |
getCheckpointInterval() |
long |
getCleanupInterval() |
int |
getCompactAcksAfterNoGC() |
File |
getDirectory() |
File |
getDirectoryArchive() |
protected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination |
getExistingStoredDestination(KahaDestination destination,
Transaction tx) |
int |
getFailoverProducersAuditDepth() |
int |
getIndexCacheSize() |
File |
getIndexDirectory() |
float |
getIndexLFUEvictionFactor() |
int |
getIndexWriteBatchSize() |
Location[] |
getInProgressTxLocationRange() |
Journal |
getJournal() |
long |
getJournalDiskSyncInterval() |
String |
getJournalDiskSyncStrategy() |
Journal.JournalDiskSyncStrategy |
getJournalDiskSyncStrategyEnum() |
HashSet<Integer> |
getJournalFilesBeingReplicated() |
int |
getJournalMaxFileLength() |
int |
getJournalMaxWriteBatchSize() |
org.apache.activemq.store.kahadb.MessageDatabase.LastAck |
getLastAck(Transaction tx,
org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
String subscriptionKey) |
Location |
getLastUpdatePosition() |
int |
getMaxFailoverProducersToTrack() |
protected MessageDatabase.Metadata |
getMetadata() |
PageFile |
getPageFile() |
PersistenceAdapterStatistics |
getPersistenceAdapterStatistics() |
String |
getPreallocationScope() |
String |
getPreallocationStrategy() |
String |
getPurgeRecoveredXATransactionStrategy() |
MessageDatabase.PurgeRecoveredXATransactionStrategy |
getPurgeRecoveredXATransactionStrategyEnum() |
protected SequenceSet |
getSequenceSet(Transaction tx,
org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
String subscriptionKey) |
protected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination |
getStoredDestination(KahaDestination destination,
Transaction tx) |
protected long |
getStoredMessageCount(Transaction tx,
org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
String subscriptionKey) |
protected Map<String,AtomicLong> |
getStoredMessageSize(Transaction tx,
org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
List<String> subscriptionKeys)
Recovers durable subscription pending message size with only 1 pass over the order index on recovery
instead of iterating over the index once per subscription
|
protected long |
getStoredMessageSize(Transaction tx,
org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
String subscriptionKey) |
protected MessageStoreStatistics |
getStoreStats(String kahaDestKey)
Locate the storeMessageSize counter for this KahaDestination
|
protected MessageStoreSubscriptionStatistics |
getSubStats(String kahaDestKey) |
String |
getTransactions() |
void |
incrementalRecover() |
protected void |
incrementAndAddSizeToStoreStat(KahaDestination kahaDestination,
long size)
Update MessageStoreStatistics
|
protected void |
incrementAndAddSizeToStoreStat(KahaDestination kahaDestination,
String subKey,
long size) |
protected void |
incrementAndAddSizeToStoreStat(String kahaDestKey,
long size) |
protected void |
incrementAndAddSizeToStoreStat(String kahaDestKey,
String subKey,
long size) |
boolean |
isArchiveCorruptedIndex() |
boolean |
isArchiveDataLogs() |
boolean |
isCheckForCorruptJournalFiles() |
boolean |
isChecksumJournalFiles() |
boolean |
isCompactAcksIgnoresStoreGrowth()
Returns whether Ack compaction will ignore that the store is still growing
and run more often.
|
boolean |
isDeleteAllMessages() |
boolean |
isEnableAckCompaction()
Returns whether Ack compaction is enabled
|
boolean |
isEnableIndexDiskSyncs() |
boolean |
isEnableIndexPageCaching() |
boolean |
isEnableIndexRecoveryFile() |
boolean |
isEnableJournalDiskSyncs()
Deprecated.
use
getJournalDiskSyncStrategyEnum() or getJournalDiskSyncStrategy() instead |
boolean |
isEnableSubscriptionStatistics() |
boolean |
isFailIfDatabaseIsLocked() |
boolean |
isIgnoreMissingJournalfiles() |
boolean |
isUseIndexLFRUEviction() |
protected String |
key(KahaDestination destination) |
void |
load() |
JournalCommand<?> |
load(Location location)
Loads a previously stored JournalMessage
|
protected boolean |
matchType(Destination destination,
KahaDestination.DestinationType type)
Determine whether this Destination matches the DestinationType
|
void |
open() |
protected void |
process(KahaAddMessageCommand command,
Location location,
org.apache.activemq.store.kahadb.MessageDatabase.IndexAware runWithIndexLock) |
protected void |
process(KahaCommitCommand command,
Location location,
org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before) |
protected void |
process(KahaPrepareCommand command,
Location location) |
protected void |
process(KahaRemoveDestinationCommand command,
Location location) |
protected void |
process(KahaRemoveMessageCommand command,
Location location) |
protected void |
process(KahaRewrittenDataFileCommand command,
Location location) |
protected void |
process(KahaRollbackCommand command,
Location location) |
protected void |
process(KahaSubscriptionCommand command,
Location location) |
protected void |
process(KahaUpdateMessageCommand command,
Location location) |
protected void |
processLocation(Location location) |
protected void |
recoverIndex(Transaction tx) |
void |
setArchiveCorruptedIndex(boolean archiveCorruptedIndex) |
void |
setArchiveDataLogs(boolean archiveDataLogs) |
void |
setBrokerService(BrokerService brokerService) |
void |
setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) |
void |
setCheckpointInterval(long checkpointInterval) |
void |
setChecksumJournalFiles(boolean checksumJournalFiles) |
void |
setCleanupInterval(long cleanupInterval) |
void |
setCompactAcksAfterNoGC(int compactAcksAfterNoGC)
Sets the number of GC cycles where no journal logs were removed before an attempt to
move forward all the acks in the last log that contains them and is otherwise unreferenced.
|
void |
setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth)
Configure if Ack compaction will occur regardless of continued growth of the
journal logs meaning that the store has not run out of space yet.
|
void |
setDeleteAllMessages(boolean deleteAllMessages) |
void |
setDirectory(File directory) |
void |
setDirectoryArchive(File directoryArchive) |
void |
setEnableAckCompaction(boolean enableAckCompaction)
Configure if the Ack compaction task should be enabled to run
|
void |
setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) |
void |
setEnableIndexPageCaching(boolean enableIndexPageCaching) |
void |
setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) |
void |
setEnableIndexWriteAsync(boolean enableIndexWriteAsync) |
void |
setEnableJournalDiskSyncs(boolean syncWrites)
Deprecated.
use
setEnableJournalDiskSyncs(boolean) instead |
void |
setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics)
Enable caching statistics for each subscription to allow non-blocking
retrieval of metrics.
|
void |
setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) |
void |
setFailoverProducersAuditDepth(int failoverProducersAuditDepth) |
void |
setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) |
void |
setIndexCacheSize(int indexCacheSize) |
void |
setIndexDirectory(File indexDirectory) |
void |
setIndexLFUEvictionFactor(float indexLFUEvictionFactor) |
void |
setIndexWriteBatchSize(int setIndexWriteBatchSize) |
void |
setJournalDiskSyncInterval(long journalDiskSyncInterval) |
void |
setJournalDiskSyncStrategy(String journalDiskSyncStrategy) |
void |
setJournalMaxFileLength(int journalMaxFileLength) |
void |
setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) |
void |
setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) |
void |
setPreallocationScope(String preallocationScope) |
void |
setPreallocationStrategy(String preallocationStrategy) |
void |
setPurgeRecoveredXATransactionStrategy(String purgeRecoveredXATransactionStrategy) |
void |
setUseIndexLFRUEviction(boolean useIndexLFRUEviction) |
Location |
store(JournalCommand<?> data) |
Location |
store(JournalCommand<?> data,
boolean sync,
org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before,
Runnable after) |
Location |
store(JournalCommand<?> data,
boolean sync,
org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before,
Runnable after,
Runnable onJournalStoreComplete)
All updated are are funneled through this method.
|
Location |
store(JournalCommand<?> data,
Runnable onJournalStoreComplete) |
ByteSequence |
toByteSequence(JournalCommand<?> data) |
void |
trackRecoveredAcks(ArrayList<MessageAck> acks) |
void |
unload() |
addServiceListener, dispose, isStarted, isStopped, isStopping, postStop, preStart, removeServiceListener, start, stop
protected BrokerService brokerService
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME
public static final int LOG_SLOW_ACCESS_TIME
public static final File DEFAULT_DIRECTORY
protected static final org.apache.activemq.protobuf.Buffer UNMATCHED
protected MessageDatabase.Metadata metadata
protected final PersistenceAdapterStatistics persistenceAdapterStatistics
protected org.apache.activemq.store.kahadb.MessageDatabase.MetadataMarshaller metadataMarshaller
protected boolean failIfDatabaseIsLocked
protected boolean deleteAllMessages
protected File indexDirectory
protected ScheduledExecutorService scheduler
protected Journal.JournalDiskSyncStrategy journalDiskSyncStrategy
protected boolean archiveDataLogs
protected File directoryArchive
protected AtomicLong journalSize
protected AtomicBoolean opened
protected MessageDatabase.PurgeRecoveredXATransactionStrategy purgeRecoveredXATransactionStrategy
protected boolean forceRecoverIndex
protected final AtomicReference<Location> lastAsyncJournalUpdate
protected final ReentrantReadWriteLock indexLock
protected final HashMap<String,org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination> storedDestinations
protected final ConcurrentMap<String,MessageStore> storeCache
protected final LinkedHashMap<TransactionId,List<org.apache.activemq.store.kahadb.MessageDatabase.Operation>> preparedTransactions
protected final Set<String> ackedAndPrepared
protected final Set<String> rolledBackAcks
public MessageDatabase()
public void doStart() throws Exception
doStart
in class ServiceSupport
Exception
public void doStop(ServiceStopper stopper) throws Exception
doStop
in class ServiceSupport
Exception
public void allowIOResumption()
public void open() throws IOException
IOException
public void load() throws IOException
IOException
public void close() throws IOException, InterruptedException
IOException
InterruptedException
public void unload() throws IOException, InterruptedException
IOException
InterruptedException
public Location[] getInProgressTxLocationRange()
public String getTransactions()
protected void recoverIndex(Transaction tx) throws IOException
IOException
public void incrementalRecover() throws IOException
IOException
public Location getLastUpdatePosition() throws IOException
IOException
protected void checkpointCleanup(boolean cleanup) throws IOException
IOException
public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException
IOException
public Location store(JournalCommand<?> data) throws IOException
IOException
public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException
IOException
public Location store(JournalCommand<?> data, boolean sync, org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before, Runnable after) throws IOException
IOException
public Location store(JournalCommand<?> data, boolean sync, org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException
IOException
public JournalCommand<?> load(Location location) throws IOException
location
- IOException
protected void process(KahaAddMessageCommand command, Location location, org.apache.activemq.store.kahadb.MessageDatabase.IndexAware runWithIndexLock) throws IOException
IOException
protected void process(KahaUpdateMessageCommand command, Location location) throws IOException
IOException
protected void process(KahaRemoveMessageCommand command, Location location) throws IOException
IOException
protected void process(KahaRemoveDestinationCommand command, Location location) throws IOException
IOException
protected void process(KahaSubscriptionCommand command, Location location) throws IOException
IOException
protected void processLocation(Location location)
protected void process(KahaCommitCommand command, Location location, org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before) throws IOException
IOException
protected void process(KahaPrepareCommand command, Location location)
protected void process(KahaRollbackCommand command, Location location) throws IOException
IOException
protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException
IOException
public HashSet<Integer> getJournalFilesBeingReplicated()
protected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException
IOException
protected org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException
IOException
protected void clearStoreStats(KahaDestination kahaDestination)
kahaDestination
- protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size)
kahaDestination
- size
- protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size)
protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size)
protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size)
protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size)
protected void incrementAndAddSizeToStoreStat(String kahaDestKey, String subKey, long size)
protected void decrementAndSubSizeToStoreStat(String kahaDestKey, String subKey, long size)
protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, String subKey, long size)
protected MessageStoreStatistics getStoreStats(String kahaDestKey)
protected MessageStoreSubscriptionStatistics getSubStats(String kahaDestKey)
protected boolean matchType(Destination destination, KahaDestination.DestinationType type)
destination
- type
- public org.apache.activemq.store.kahadb.MessageDatabase.LastAck getLastAck(Transaction tx, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, String subscriptionKey) throws IOException
IOException
protected SequenceSet getSequenceSet(Transaction tx, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, String subscriptionKey) throws IOException
IOException
protected long getStoredMessageCount(Transaction tx, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, String subscriptionKey) throws IOException
IOException
protected Map<String,AtomicLong> getStoredMessageSize(Transaction tx, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, List<String> subscriptionKeys) throws IOException
tx
- sd
- subscriptionKeys
- IOException
protected long getStoredMessageSize(Transaction tx, org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd, String subscriptionKey) throws IOException
IOException
protected String key(KahaDestination destination)
public void trackRecoveredAcks(ArrayList<MessageAck> acks)
public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException
IOException
protected Journal createJournal() throws IOException
IOException
protected abstract void configureMetadata()
public int getJournalMaxWriteBatchSize()
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize)
public File getDirectory()
public void setDirectory(File directory)
public boolean isDeleteAllMessages()
public void setDeleteAllMessages(boolean deleteAllMessages)
public void setIndexWriteBatchSize(int setIndexWriteBatchSize)
public int getIndexWriteBatchSize()
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync)
@Deprecated public boolean isEnableJournalDiskSyncs()
getJournalDiskSyncStrategyEnum()
or getJournalDiskSyncStrategy()
instead@Deprecated public void setEnableJournalDiskSyncs(boolean syncWrites)
setEnableJournalDiskSyncs(boolean)
insteadsyncWrites
- public Journal.JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum()
public String getJournalDiskSyncStrategy()
public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy)
public long getJournalDiskSyncInterval()
public void setJournalDiskSyncInterval(long journalDiskSyncInterval)
public long getCheckpointInterval()
public void setCheckpointInterval(long checkpointInterval)
public long getCleanupInterval()
public void setCleanupInterval(long cleanupInterval)
public void setJournalMaxFileLength(int journalMaxFileLength)
public int getJournalMaxFileLength()
public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack)
public int getMaxFailoverProducersToTrack()
public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth)
public int getFailoverProducersAuditDepth()
public PageFile getPageFile() throws IOException
IOException
public Journal getJournal() throws IOException
IOException
protected MessageDatabase.Metadata getMetadata()
public boolean isFailIfDatabaseIsLocked()
public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked)
public boolean isIgnoreMissingJournalfiles()
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles)
public int getIndexCacheSize()
public void setIndexCacheSize(int indexCacheSize)
public boolean isCheckForCorruptJournalFiles()
public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles)
public MessageDatabase.PurgeRecoveredXATransactionStrategy getPurgeRecoveredXATransactionStrategyEnum()
public String getPurgeRecoveredXATransactionStrategy()
public void setPurgeRecoveredXATransactionStrategy(String purgeRecoveredXATransactionStrategy)
public boolean isChecksumJournalFiles()
public void setChecksumJournalFiles(boolean checksumJournalFiles)
public void setBrokerService(BrokerService brokerService)
setBrokerService
in interface BrokerServiceAware
public boolean isArchiveDataLogs()
public void setArchiveDataLogs(boolean archiveDataLogs)
archiveDataLogs
- the archiveDataLogs to setpublic File getDirectoryArchive()
public void setDirectoryArchive(File directoryArchive)
directoryArchive
- the directoryArchive to setpublic boolean isArchiveCorruptedIndex()
public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex)
public float getIndexLFUEvictionFactor()
public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor)
public boolean isUseIndexLFRUEviction()
public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction)
public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs)
public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile)
public void setEnableIndexPageCaching(boolean enableIndexPageCaching)
public boolean isEnableIndexDiskSyncs()
public boolean isEnableIndexRecoveryFile()
public boolean isEnableIndexPageCaching()
public PersistenceAdapterStatistics getPersistenceAdapterStatistics()
public File getIndexDirectory()
public void setIndexDirectory(File indexDirectory)
public String getPreallocationScope()
public void setPreallocationScope(String preallocationScope)
public String getPreallocationStrategy()
public void setPreallocationStrategy(String preallocationStrategy)
public int getCompactAcksAfterNoGC()
public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC)
A value of -1 will disable this feature.
compactAcksAfterNoGC
- Number of empty GC cycles before we rewrite old ACKS.public boolean isCompactAcksIgnoresStoreGrowth()
public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth)
compactAcksIgnoresStoreGrowth
- the compactAcksIgnoresStoreGrowth to setpublic boolean isEnableAckCompaction()
public void setEnableAckCompaction(boolean enableAckCompaction)
enableAckCompaction
- public boolean isEnableSubscriptionStatistics()
public void setEnableSubscriptionStatistics(boolean enableSubscriptionStatistics)
enableSubscriptionStatistics
- Copyright © 2005–2019 The Apache Software Foundation. All rights reserved.