Package org.apache.activemq.store.jdbc
Class JDBCPersistenceAdapter
- java.lang.Object
-
- org.apache.activemq.util.ServiceSupport
-
- org.apache.activemq.broker.LockableServiceSupport
-
- org.apache.activemq.store.jdbc.DataSourceServiceSupport
-
- org.apache.activemq.store.jdbc.JDBCPersistenceAdapter
-
- All Implemented Interfaces:
BrokerServiceAware,Lockable,Service,PersistenceAdapter
public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter
APersistenceAdapterimplementation using JDBC for persistence storage. This persistence adapter will correctly remember prepared XA transactions, but it will not keep track of local transaction commits so that operations performed against the Message store are done as a single uow.
-
-
Field Summary
Fields Modifier and Type Field Description protected ActiveMQMessageAuditauditprotected intauditRecoveryDepthstatic longDEFAULT_LOCK_KEEP_ALIVE_PERIODprotected booleanenableAuditprotected intmaxAuditDepthprotected intmaxProducersToAuditprotected intmaxRowsprotected LongSequenceGeneratorsequenceGeneratorprotected HashMap<ActiveMQDestination,MessageStore>storeCache-
Fields inherited from class org.apache.activemq.broker.LockableServiceSupport
brokerService, clockDaemon
-
-
Constructor Summary
Constructors Constructor Description JDBCPersistenceAdapter()JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description voidallowIOResumption()voidbeginTransaction(ConnectionContext context)This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.voidcheckpoint(boolean sync)checkpoint anyvoidcleanup()voidcommitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId, long newSequence)voidcommitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId)voidcommitRemove(ConnectionContext context, MessageAck ack)voidcommitTransaction(ConnectionContext context)Commit a persistence transactionprotected JDBCAdaptercreateAdapter()LockercreateDefaultLocker()Create a default lockerJobSchedulerStorecreateJobSchedulerStore()Creates and returns a new Job Scheduler store instance.protected voidcreateMessageAudit()MessageStorecreateQueueMessageStore(ActiveMQQueue destination)Factory method to create a new queue message store with the given destination nameTopicMessageStorecreateTopicMessageStore(ActiveMQTopic destination)Factory method to create a new topic message store with the given destination nameTransactionStorecreateTransactionStore()Factory method to create a new persistent prepared transaction store for XA recoveryvoiddeleteAllMessages()Delete's all the messages in the persistent store.voiddoStart()voiddoStop(ServiceStopper stopper)JDBCAdaptergetAdapter()intgetAuditRecoveryDepth()BrokerServicegetBrokerService()intgetCleanupPeriod()LockergetDatabaseLocker()Deprecated.as of 5.7.0, replaced byLockableServiceSupport.getLocker()Set<ActiveMQDestination>getDestinations()Returns a set of all theActiveMQDestinationobjects that the persistence store is aware exist.FilegetDirectory()longgetLastMessageBrokerSequenceId()longgetLastProducerSequenceId(ProducerId id)return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occursDataSourcegetLockDataSource()intgetMaxAuditDepth()intgetMaxProducersToAudit()intgetMaxRows()intgetNetworkTimeout()longgetNextSequenceId()intgetQueryTimeout()ScheduledThreadPoolExecutorgetScheduledThreadPoolExecutor()StatementsgetStatements()TransactionContextgetTransactionContext()TransactionContextgetTransactionContext(ConnectionContext context)WireFormatgetWireFormat()voidinit()Initialize resources before lockingvoidinitSequenceIdGenerator()booleanisChangeAutoCommitAllowed()booleanisCreateTablesOnStartup()booleanisEnableAudit()booleanisUseExternalMessageReferences()static voidlog(String msg, SQLException e)voidrecover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore)voidremoveQueueMessageStore(ActiveMQQueue destination)Cleanup method to remove any state associated with the given destinationvoidremoveTopicMessageStore(ActiveMQTopic destination)Cleanup method to remove any state associated with the given destination No state retained....voidrollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId)voidrollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId)voidrollbackTransaction(ConnectionContext context)Rollback a persistence transactionvoidsetAdapter(JDBCAdapter adapter)voidsetAuditRecoveryDepth(int auditRecoveryDepth)voidsetBrokerName(String brokerName)Set the name of the broker using the adaptervoidsetChangeAutoCommitAllowed(boolean changeAutoCommitAllowed)Whether the JDBC driver allows to set the auto commit.voidsetCleanupPeriod(int cleanupPeriod)Sets the number of milliseconds until the database is attempted to be cleaned up for durable topicsvoidsetCreateTablesOnStartup(boolean createTablesOnStartup)Sets whether or not tables are created on startupvoidsetDatabaseLocker(Locker locker)Deprecated.as of 5.7.0, replaced byLockableServiceSupport.setLocker(org.apache.activemq.broker.Locker)voidsetDirectory(File dir)Set the directory where any data files should be createdvoidsetEnableAudit(boolean enableAudit)voidsetLockAcquireSleepInterval(long lockAcquireSleepInterval)Deprecated.useLocker.setLockAcquireSleepInterval(long)instead millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker not applied if DataBaseLocker is injected.voidsetLockDataSource(DataSource dataSource)voidsetMaxAuditDepth(int maxAuditDepth)voidsetMaxProducersToAudit(int maxProducersToAudit)voidsetMaxRows(int maxRows)voidsetNetworkTimeout(int networkTimeout)Define the JDBC connection network timeout.voidsetQueryTimeout(int queryTimeout)Define the JDBC statement query timeout.voidsetStatements(Statements statements)voidsetTransactionIsolation(int transactionIsolation)set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED This allowable dirty isolation level may not be achievable in clustered DB environments so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ see isolation level constants inConnectionvoidsetUsageManager(SystemUsage usageManager)voidsetUseDatabaseLock(boolean useDatabaseLock)Deprecated.useLockableServiceSupport.setUseLock(boolean)instead Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave.voidsetUseExternalMessageReferences(boolean useExternalMessageReferences)voidsetWireFormat(WireFormat wireFormat)longsize()A hint to return the size of the store on diskStringtoString()-
Methods inherited from class org.apache.activemq.store.jdbc.DataSourceServiceSupport
closeDataSource, createDataSource, createDataSource, getDataDirectory, getDataDirectoryFile, getDataSource, setDataDirectory, setDataDirectoryFile, setDataSource, shutdownDefaultDataSource
-
Methods inherited from class org.apache.activemq.broker.LockableServiceSupport
getLocker, getLockKeepAlivePeriod, isStopOnError, isUseLock, keepLockAlive, postStop, preStart, setBrokerService, setLocker, setLockKeepAlivePeriod, setScheduledThreadPoolExecutor, setStopOnError, setUseLock, stopBroker
-
Methods inherited from class org.apache.activemq.util.ServiceSupport
addServiceListener, dispose, isStarted, isStopped, isStopping, removeServiceListener, start, stop
-
-
-
-
Field Detail
-
DEFAULT_LOCK_KEEP_ALIVE_PERIOD
public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD
- See Also:
- Constant Field Values
-
maxProducersToAudit
protected int maxProducersToAudit
-
maxAuditDepth
protected int maxAuditDepth
-
enableAudit
protected boolean enableAudit
-
auditRecoveryDepth
protected int auditRecoveryDepth
-
audit
protected ActiveMQMessageAudit audit
-
sequenceGenerator
protected LongSequenceGenerator sequenceGenerator
-
maxRows
protected int maxRows
-
storeCache
protected final HashMap<ActiveMQDestination,MessageStore> storeCache
-
-
Constructor Detail
-
JDBCPersistenceAdapter
public JDBCPersistenceAdapter()
-
JDBCPersistenceAdapter
public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat)
-
-
Method Detail
-
getDestinations
public Set<ActiveMQDestination> getDestinations()
Description copied from interface:PersistenceAdapterReturns a set of all theActiveMQDestinationobjects that the persistence store is aware exist.- Specified by:
getDestinationsin interfacePersistenceAdapter- Returns:
- active destinations
-
createMessageAudit
protected void createMessageAudit()
-
initSequenceIdGenerator
public void initSequenceIdGenerator()
-
createQueueMessageStore
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException
Description copied from interface:PersistenceAdapterFactory method to create a new queue message store with the given destination name- Specified by:
createQueueMessageStorein interfacePersistenceAdapter- Returns:
- the message store
- Throws:
IOException
-
createTopicMessageStore
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException
Description copied from interface:PersistenceAdapterFactory method to create a new topic message store with the given destination name- Specified by:
createTopicMessageStorein interfacePersistenceAdapter- Returns:
- the topic message store
- Throws:
IOException
-
removeQueueMessageStore
public void removeQueueMessageStore(ActiveMQQueue destination)
Cleanup method to remove any state associated with the given destination- Specified by:
removeQueueMessageStorein interfacePersistenceAdapter- Parameters:
destination- Destination to forget
-
removeTopicMessageStore
public void removeTopicMessageStore(ActiveMQTopic destination)
Cleanup method to remove any state associated with the given destination No state retained.... nothing to do- Specified by:
removeTopicMessageStorein interfacePersistenceAdapter- Parameters:
destination- Destination to forget
-
createTransactionStore
public TransactionStore createTransactionStore() throws IOException
Description copied from interface:PersistenceAdapterFactory method to create a new persistent prepared transaction store for XA recovery- Specified by:
createTransactionStorein interfacePersistenceAdapter- Returns:
- transaction store
- Throws:
IOException
-
getLastMessageBrokerSequenceId
public long getLastMessageBrokerSequenceId() throws IOException- Specified by:
getLastMessageBrokerSequenceIdin interfacePersistenceAdapter- Returns:
- last broker sequence
- Throws:
IOException
-
getLastProducerSequenceId
public long getLastProducerSequenceId(ProducerId id) throws IOException
Description copied from interface:PersistenceAdapterreturn the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs- Specified by:
getLastProducerSequenceIdin interfacePersistenceAdapter- Parameters:
id- the producerId to find a sequenceId for- Returns:
- the last stored sequence id or -1 if no suppression needed
- Throws:
IOException
-
allowIOResumption
public void allowIOResumption()
- Specified by:
allowIOResumptionin interfacePersistenceAdapter
-
init
public void init() throws ExceptionDescription copied from class:LockableServiceSupportInitialize resources before locking- Specified by:
initin classLockableServiceSupport- Throws:
Exception
-
doStart
public void doStart() throws Exception- Specified by:
doStartin classServiceSupport- Throws:
Exception
-
doStop
public void doStop(ServiceStopper stopper) throws Exception
- Specified by:
doStopin classServiceSupport- Throws:
Exception
-
cleanup
public void cleanup()
-
getScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor()
- Overrides:
getScheduledThreadPoolExecutorin classLockableServiceSupport
-
getAdapter
public JDBCAdapter getAdapter() throws IOException
- Throws:
IOException
-
getDatabaseLocker
@Deprecated public Locker getDatabaseLocker() throws IOException
Deprecated.as of 5.7.0, replaced byLockableServiceSupport.getLocker()- Throws:
IOException
-
setDatabaseLocker
@Deprecated public void setDatabaseLocker(Locker locker) throws IOException
Deprecated.as of 5.7.0, replaced byLockableServiceSupport.setLocker(org.apache.activemq.broker.Locker)Sets the database locker strategy to use to lock the database on startup- Throws:
IOException
-
getLockDataSource
public DataSource getLockDataSource() throws IOException
- Throws:
IOException
-
setLockDataSource
public void setLockDataSource(DataSource dataSource)
-
getBrokerService
public BrokerService getBrokerService()
- Overrides:
getBrokerServicein classLockableServiceSupport
-
createAdapter
protected JDBCAdapter createAdapter() throws IOException
- Throws:
IOException
-
setAdapter
public void setAdapter(JDBCAdapter adapter)
-
getWireFormat
public WireFormat getWireFormat()
-
setWireFormat
public void setWireFormat(WireFormat wireFormat)
-
getTransactionContext
public TransactionContext getTransactionContext(ConnectionContext context) throws IOException
- Throws:
IOException
-
getTransactionContext
public TransactionContext getTransactionContext() throws IOException
- Throws:
IOException
-
beginTransaction
public void beginTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapterThis method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization. Typically one transaction will require one disk synchronization point and so for real high performance its usually faster to perform many writes within the same transaction to minimize latency caused by disk synchronization. This is especially true when using tools like Berkeley Db or embedded JDBC servers.- Specified by:
beginTransactionin interfacePersistenceAdapter- Throws:
IOException
-
commitTransaction
public void commitTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapterCommit a persistence transaction- Specified by:
commitTransactionin interfacePersistenceAdapter- Throws:
IOException- See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)
-
rollbackTransaction
public void rollbackTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapterRollback a persistence transaction- Specified by:
rollbackTransactionin interfacePersistenceAdapter- Throws:
IOException- See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)
-
getCleanupPeriod
public int getCleanupPeriod()
-
setCleanupPeriod
public void setCleanupPeriod(int cleanupPeriod)
Sets the number of milliseconds until the database is attempted to be cleaned up for durable topics
-
isChangeAutoCommitAllowed
public boolean isChangeAutoCommitAllowed()
-
setChangeAutoCommitAllowed
public void setChangeAutoCommitAllowed(boolean changeAutoCommitAllowed)
Whether the JDBC driver allows to set the auto commit. Some drivers does not allow changing the auto commit. The default value is true.- Parameters:
changeAutoCommitAllowed- true to change, false to not change.
-
getNetworkTimeout
public int getNetworkTimeout()
-
setNetworkTimeout
public void setNetworkTimeout(int networkTimeout)
Define the JDBC connection network timeout.- Parameters:
networkTimeout- the connection network timeout (in milliseconds).
-
getQueryTimeout
public int getQueryTimeout()
-
setQueryTimeout
public void setQueryTimeout(int queryTimeout)
Define the JDBC statement query timeout.- Parameters:
queryTimeout- the statement query timeout (in seconds).
-
deleteAllMessages
public void deleteAllMessages() throws IOExceptionDescription copied from interface:PersistenceAdapterDelete's all the messages in the persistent store.- Specified by:
deleteAllMessagesin interfacePersistenceAdapter- Throws:
IOException
-
isUseExternalMessageReferences
public boolean isUseExternalMessageReferences()
-
setUseExternalMessageReferences
public void setUseExternalMessageReferences(boolean useExternalMessageReferences)
-
isCreateTablesOnStartup
public boolean isCreateTablesOnStartup()
-
setCreateTablesOnStartup
public void setCreateTablesOnStartup(boolean createTablesOnStartup)
Sets whether or not tables are created on startup
-
setUseDatabaseLock
@Deprecated public void setUseDatabaseLock(boolean useDatabaseLock)
Deprecated.useLockableServiceSupport.setUseLock(boolean)instead Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave. Enabled by default.
-
log
public static void log(String msg, SQLException e)
-
getStatements
public Statements getStatements()
-
setStatements
public void setStatements(Statements statements)
-
setUsageManager
public void setUsageManager(SystemUsage usageManager)
- Specified by:
setUsageManagerin interfacePersistenceAdapter- Parameters:
usageManager- The UsageManager that is controlling the destination's memory usage.
-
createDefaultLocker
public Locker createDefaultLocker() throws IOException
Description copied from interface:LockableCreate a default locker- Specified by:
createDefaultLockerin interfaceLockable- Returns:
- default locker
- Throws:
IOException
-
setBrokerName
public void setBrokerName(String brokerName)
Description copied from interface:PersistenceAdapterSet the name of the broker using the adapter- Specified by:
setBrokerNamein interfacePersistenceAdapter
-
toString
public String toString()
- Overrides:
toStringin classDataSourceServiceSupport
-
setDirectory
public void setDirectory(File dir)
Description copied from interface:PersistenceAdapterSet the directory where any data files should be created- Specified by:
setDirectoryin interfacePersistenceAdapter
-
getDirectory
public File getDirectory()
- Specified by:
getDirectoryin interfacePersistenceAdapter- Returns:
- the directory used by the persistence adaptor
-
checkpoint
public void checkpoint(boolean sync) throws IOExceptionDescription copied from interface:PersistenceAdaptercheckpoint any- Specified by:
checkpointin interfacePersistenceAdapter- Throws:
IOException
-
size
public long size()
Description copied from interface:PersistenceAdapterA hint to return the size of the store on disk- Specified by:
sizein interfacePersistenceAdapter- Returns:
- disk space used in bytes of 0 if not implemented
-
setLockAcquireSleepInterval
@Deprecated public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException
Deprecated.useLocker.setLockAcquireSleepInterval(long)instead millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker not applied if DataBaseLocker is injected.- Throws:
IOException
-
setTransactionIsolation
public void setTransactionIsolation(int transactionIsolation)
set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED This allowable dirty isolation level may not be achievable in clustered DB environments so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ see isolation level constants inConnection- Parameters:
transactionIsolation- the isolation level to use
-
getMaxProducersToAudit
public int getMaxProducersToAudit()
-
setMaxProducersToAudit
public void setMaxProducersToAudit(int maxProducersToAudit)
-
getMaxAuditDepth
public int getMaxAuditDepth()
-
setMaxAuditDepth
public void setMaxAuditDepth(int maxAuditDepth)
-
isEnableAudit
public boolean isEnableAudit()
-
setEnableAudit
public void setEnableAudit(boolean enableAudit)
-
getAuditRecoveryDepth
public int getAuditRecoveryDepth()
-
setAuditRecoveryDepth
public void setAuditRecoveryDepth(int auditRecoveryDepth)
-
getNextSequenceId
public long getNextSequenceId()
-
getMaxRows
public int getMaxRows()
-
setMaxRows
public void setMaxRows(int maxRows)
-
recover
public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws IOException
- Throws:
IOException
-
commitAdd
public void commitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId, long newSequence) throws IOException
- Throws:
IOException
-
commitRemove
public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException
- Throws:
IOException
-
commitLastAck
public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException
- Throws:
IOException
-
rollbackLastAck
public void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId) throws IOException
- Throws:
IOException
-
rollbackLastAck
public void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId) throws IOException
- Throws:
IOException
-
createJobSchedulerStore
public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException
Description copied from interface:PersistenceAdapterCreates and returns a new Job Scheduler store instance.- Specified by:
createJobSchedulerStorein interfacePersistenceAdapter- Returns:
- a new JobSchedulerStore instance if this Persistence adapter provides its own.
- Throws:
IOException- If an error occurs while creating the new JobSchedulerStore.UnsupportedOperationException- If this adapter does not provide its own scheduler store implementation.
-
-