Class MessageDatabase

    • Field Detail

      • LOG_SLOW_ACCESS_TIME

        public static final int LOG_SLOW_ACCESS_TIME
      • DEFAULT_DIRECTORY

        public static final File DEFAULT_DIRECTORY
      • UNMATCHED

        protected static final org.apache.activemq.protobuf.Buffer UNMATCHED
      • journal

        protected Journal journal
      • metadataMarshaller

        protected org.apache.activemq.store.kahadb.MessageDatabase.MetadataMarshaller metadataMarshaller
      • failIfDatabaseIsLocked

        protected boolean failIfDatabaseIsLocked
      • deleteAllMessages

        protected boolean deleteAllMessages
      • directory

        protected File directory
      • indexDirectory

        protected File indexDirectory
      • archiveDataLogs

        protected boolean archiveDataLogs
      • directoryArchive

        protected File directoryArchive
      • forceRecoverIndex

        protected boolean forceRecoverIndex
      • storedDestinations

        protected final HashMap<String,​org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination> storedDestinations
      • preparedTransactions

        protected final LinkedHashMap<TransactionId,​List<org.apache.activemq.store.kahadb.MessageDatabase.Operation>> preparedTransactions
    • Constructor Detail

      • MessageDatabase

        public MessageDatabase()
    • Method Detail

      • allowIOResumption

        public void allowIOResumption()
      • getInProgressTxLocationRange

        public Location[] getInProgressTxLocationRange()
      • getTransactions

        public String getTransactions()
      • getPreparedTransaction

        public String getPreparedTransaction​(TransactionId transactionId)
      • checkpointCleanup

        protected void checkpointCleanup​(boolean cleanup)
                                  throws IOException
        Throws:
        IOException
      • store

        public Location store​(JournalCommand<?> data,
                              boolean sync,
                              org.apache.activemq.store.kahadb.MessageDatabase.IndexAware before,
                              Runnable after,
                              Runnable onJournalStoreComplete)
                       throws IOException
        All updated are are funneled through this method. The updates are converted to a JournalMessage which is logged to the journal and then the data from the JournalMessage is used to update the index just like it would be done during a recovery process.
        Throws:
        IOException
      • processLocation

        protected void processLocation​(Location location)
      • getJournalFilesBeingReplicated

        public HashSet<Integer> getJournalFilesBeingReplicated()
      • clearStoreStats

        protected void clearStoreStats​(KahaDestination kahaDestination)
        Clear the counter for the destination, if one exists.
        Parameters:
        kahaDestination -
      • incrementAndAddSizeToStoreStat

        protected void incrementAndAddSizeToStoreStat​(Transaction tx,
                                                      KahaDestination kahaDestination,
                                                      long size)
                                               throws IOException
        Update MessageStoreStatistics
        Parameters:
        kahaDestination -
        size -
        Throws:
        IOException
      • incrementAndAddSizeToStoreStat

        protected void incrementAndAddSizeToStoreStat​(Transaction tx,
                                                      String kahaDestKey,
                                                      org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
                                                      long size)
                                               throws IOException
        Throws:
        IOException
      • decrementAndSubSizeToStoreStat

        protected void decrementAndSubSizeToStoreStat​(Transaction tx,
                                                      String kahaDestKey,
                                                      org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
                                                      long size)
                                               throws IOException
        Throws:
        IOException
      • incrementAndAddSizeToStoreStat

        protected void incrementAndAddSizeToStoreStat​(KahaDestination kahaDestination,
                                                      String subKey,
                                                      long size)
      • incrementAndAddSizeToStoreStat

        protected void incrementAndAddSizeToStoreStat​(String kahaDestKey,
                                                      String subKey,
                                                      long size)
      • decrementAndSubSizeToStoreStat

        protected void decrementAndSubSizeToStoreStat​(String kahaDestKey,
                                                      String subKey,
                                                      long size)
      • decrementAndSubSizeToStoreStat

        protected void decrementAndSubSizeToStoreStat​(KahaDestination kahaDestination,
                                                      String subKey,
                                                      long size)
      • getStoreStats

        protected MessageStoreStatistics getStoreStats​(String kahaDestKey)
        Locate the storeMessageSize counter for this KahaDestination
      • matchType

        protected boolean matchType​(Destination destination,
                                    KahaDestination.DestinationType type)
        Determine whether this Destination matches the DestinationType
        Parameters:
        destination -
        type -
        Returns:
      • getLastAck

        public org.apache.activemq.store.kahadb.MessageDatabase.LastAck getLastAck​(Transaction tx,
                                                                                   org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
                                                                                   String subscriptionKey)
                                                                            throws IOException
        Throws:
        IOException
      • getStoredMessageCount

        protected long getStoredMessageCount​(Transaction tx,
                                             org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
                                             String subscriptionKey)
                                      throws IOException
        Throws:
        IOException
      • getStoredMessageSize

        protected Map<String,​AtomicLong> getStoredMessageSize​(Transaction tx,
                                                                    org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
                                                                    List<String> subscriptionKeys)
                                                             throws IOException
        Recovers durable subscription pending message size with only 1 pass over the order index on recovery instead of iterating over the index once per subscription
        Parameters:
        tx -
        sd -
        subscriptionKeys -
        Returns:
        Throws:
        IOException
      • getStoredMessageSize

        protected long getStoredMessageSize​(Transaction tx,
                                            org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination sd,
                                            String subscriptionKey)
                                     throws IOException
        Throws:
        IOException
      • configureMetadata

        protected abstract void configureMetadata()
      • getJournalMaxWriteBatchSize

        public int getJournalMaxWriteBatchSize()
      • setJournalMaxWriteBatchSize

        public void setJournalMaxWriteBatchSize​(int journalMaxWriteBatchSize)
      • getDirectory

        public File getDirectory()
      • setDirectory

        public void setDirectory​(File directory)
      • isDeleteAllMessages

        public boolean isDeleteAllMessages()
      • setDeleteAllMessages

        public void setDeleteAllMessages​(boolean deleteAllMessages)
      • setIndexWriteBatchSize

        public void setIndexWriteBatchSize​(int setIndexWriteBatchSize)
      • getIndexWriteBatchSize

        public int getIndexWriteBatchSize()
      • setEnableIndexWriteAsync

        public void setEnableIndexWriteAsync​(boolean enableIndexWriteAsync)
      • getJournalDiskSyncStrategy

        public String getJournalDiskSyncStrategy()
      • setJournalDiskSyncStrategy

        public void setJournalDiskSyncStrategy​(String journalDiskSyncStrategy)
      • getJournalDiskSyncInterval

        public long getJournalDiskSyncInterval()
      • setJournalDiskSyncInterval

        public void setJournalDiskSyncInterval​(long journalDiskSyncInterval)
      • getCheckpointInterval

        public long getCheckpointInterval()
      • setCheckpointInterval

        public void setCheckpointInterval​(long checkpointInterval)
      • getCleanupInterval

        public long getCleanupInterval()
      • setCleanupInterval

        public void setCleanupInterval​(long cleanupInterval)
      • getCleanupOnStop

        public boolean getCleanupOnStop()
      • setCleanupOnStop

        public void setCleanupOnStop​(boolean cleanupOnStop)
      • setJournalMaxFileLength

        public void setJournalMaxFileLength​(int journalMaxFileLength)
      • getJournalMaxFileLength

        public int getJournalMaxFileLength()
      • setMaxFailoverProducersToTrack

        public void setMaxFailoverProducersToTrack​(int maxFailoverProducersToTrack)
      • getMaxFailoverProducersToTrack

        public int getMaxFailoverProducersToTrack()
      • setFailoverProducersAuditDepth

        public void setFailoverProducersAuditDepth​(int failoverProducersAuditDepth)
      • getFailoverProducersAuditDepth

        public int getFailoverProducersAuditDepth()
      • isFailIfDatabaseIsLocked

        public boolean isFailIfDatabaseIsLocked()
      • setFailIfDatabaseIsLocked

        public void setFailIfDatabaseIsLocked​(boolean failIfDatabaseIsLocked)
      • isIgnoreMissingJournalfiles

        public boolean isIgnoreMissingJournalfiles()
      • setIgnoreMissingJournalfiles

        public void setIgnoreMissingJournalfiles​(boolean ignoreMissingJournalfiles)
      • getIndexCacheSize

        public int getIndexCacheSize()
      • setIndexCacheSize

        public void setIndexCacheSize​(int indexCacheSize)
      • isCheckForCorruptJournalFiles

        public boolean isCheckForCorruptJournalFiles()
      • setCheckForCorruptJournalFiles

        public void setCheckForCorruptJournalFiles​(boolean checkForCorruptJournalFiles)
      • getPurgeRecoveredXATransactionStrategy

        public String getPurgeRecoveredXATransactionStrategy()
      • setPurgeRecoveredXATransactionStrategy

        public void setPurgeRecoveredXATransactionStrategy​(String purgeRecoveredXATransactionStrategy)
      • isChecksumJournalFiles

        public boolean isChecksumJournalFiles()
      • setChecksumJournalFiles

        public void setChecksumJournalFiles​(boolean checksumJournalFiles)
      • isArchiveDataLogs

        public boolean isArchiveDataLogs()
        Returns:
        the archiveDataLogs
      • setArchiveDataLogs

        public void setArchiveDataLogs​(boolean archiveDataLogs)
        Parameters:
        archiveDataLogs - the archiveDataLogs to set
      • getDirectoryArchive

        public File getDirectoryArchive()
        Returns:
        the directoryArchive
      • setDirectoryArchive

        public void setDirectoryArchive​(File directoryArchive)
        Parameters:
        directoryArchive - the directoryArchive to set
      • isArchiveCorruptedIndex

        public boolean isArchiveCorruptedIndex()
      • setArchiveCorruptedIndex

        public void setArchiveCorruptedIndex​(boolean archiveCorruptedIndex)
      • getIndexLFUEvictionFactor

        public float getIndexLFUEvictionFactor()
      • setIndexLFUEvictionFactor

        public void setIndexLFUEvictionFactor​(float indexLFUEvictionFactor)
      • isUseIndexLFRUEviction

        public boolean isUseIndexLFRUEviction()
      • setUseIndexLFRUEviction

        public void setUseIndexLFRUEviction​(boolean useIndexLFRUEviction)
      • setEnableIndexDiskSyncs

        public void setEnableIndexDiskSyncs​(boolean enableIndexDiskSyncs)
      • setEnableIndexRecoveryFile

        public void setEnableIndexRecoveryFile​(boolean enableIndexRecoveryFile)
      • setEnableIndexPageCaching

        public void setEnableIndexPageCaching​(boolean enableIndexPageCaching)
      • isEnableIndexDiskSyncs

        public boolean isEnableIndexDiskSyncs()
      • isEnableIndexRecoveryFile

        public boolean isEnableIndexRecoveryFile()
      • isEnableIndexPageCaching

        public boolean isEnableIndexPageCaching()
      • getIndexDirectory

        public File getIndexDirectory()
      • setIndexDirectory

        public void setIndexDirectory​(File indexDirectory)
      • getPreallocationScope

        public String getPreallocationScope()
      • setPreallocationScope

        public void setPreallocationScope​(String preallocationScope)
      • getPreallocationStrategy

        public String getPreallocationStrategy()
      • setPreallocationStrategy

        public void setPreallocationStrategy​(String preallocationStrategy)
      • getCompactAcksAfterNoGC

        public int getCompactAcksAfterNoGC()
      • setCompactAcksAfterNoGC

        public void setCompactAcksAfterNoGC​(int compactAcksAfterNoGC)
        Sets the number of GC cycles where no journal logs were removed before an attempt to move forward all the acks in the last log that contains them and is otherwise unreferenced.

        A value of -1 will disable this feature.

        Parameters:
        compactAcksAfterNoGC - Number of empty GC cycles before we rewrite old ACKS.
      • isCompactAcksIgnoresStoreGrowth

        public boolean isCompactAcksIgnoresStoreGrowth()
        Returns whether Ack compaction will ignore that the store is still growing and run more often.
        Returns:
        the compactAcksIgnoresStoreGrowth current value.
      • setCompactAcksIgnoresStoreGrowth

        public void setCompactAcksIgnoresStoreGrowth​(boolean compactAcksIgnoresStoreGrowth)
        Configure if Ack compaction will occur regardless of continued growth of the journal logs meaning that the store has not run out of space yet. Because the compaction operation can be costly this value is defaulted to off and the Ack compaction is only done when it seems that the store cannot grow and larger.
        Parameters:
        compactAcksIgnoresStoreGrowth - the compactAcksIgnoresStoreGrowth to set
      • isEnableAckCompaction

        public boolean isEnableAckCompaction()
        Returns whether Ack compaction is enabled
        Returns:
        enableAckCompaction
      • setEnableAckCompaction

        public void setEnableAckCompaction​(boolean enableAckCompaction)
        Configure if the Ack compaction task should be enabled to run
        Parameters:
        enableAckCompaction -
      • isEnableSubscriptionStatistics

        public boolean isEnableSubscriptionStatistics()
        Returns:
      • setEnableSubscriptionStatistics

        public void setEnableSubscriptionStatistics​(boolean enableSubscriptionStatistics)
        Enable caching statistics for each subscription to allow non-blocking retrieval of metrics. This could incur some overhead to compute if there are a lot of subscriptions.
        Parameters:
        enableSubscriptionStatistics -