Package org.apache.activemq.store.jdbc
Class JDBCPersistenceAdapter
- java.lang.Object
-
- org.apache.activemq.util.ServiceSupport
-
- org.apache.activemq.broker.LockableServiceSupport
-
- org.apache.activemq.store.jdbc.DataSourceServiceSupport
-
- org.apache.activemq.store.jdbc.JDBCPersistenceAdapter
-
- All Implemented Interfaces:
BrokerServiceAware
,Lockable
,Service
,PersistenceAdapter
public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter
APersistenceAdapter
implementation using JDBC for persistence storage. This persistence adapter will correctly remember prepared XA transactions, but it will not keep track of local transaction commits so that operations performed against the Message store are done as a single uow.
-
-
Field Summary
Fields Modifier and Type Field Description protected ActiveMQMessageAudit
audit
protected int
auditRecoveryDepth
static long
DEFAULT_LOCK_KEEP_ALIVE_PERIOD
protected boolean
enableAudit
protected int
maxAuditDepth
protected int
maxProducersToAudit
protected int
maxRows
protected LongSequenceGenerator
sequenceGenerator
protected HashMap<ActiveMQDestination,MessageStore>
storeCache
-
Fields inherited from class org.apache.activemq.broker.LockableServiceSupport
brokerService, clockDaemon
-
-
Constructor Summary
Constructors Constructor Description JDBCPersistenceAdapter()
JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description void
allowIOResumption()
void
beginTransaction(ConnectionContext context)
This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.void
checkpoint(boolean sync)
checkpoint anyvoid
cleanup()
void
commitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId, long newSequence)
void
commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId)
void
commitRemove(ConnectionContext context, MessageAck ack)
void
commitTransaction(ConnectionContext context)
Commit a persistence transactionprotected JDBCAdapter
createAdapter()
Locker
createDefaultLocker()
Create a default lockerJobSchedulerStore
createJobSchedulerStore()
Creates and returns a new Job Scheduler store instance.protected void
createMessageAudit()
MessageStore
createQueueMessageStore(ActiveMQQueue destination)
Factory method to create a new queue message store with the given destination nameTopicMessageStore
createTopicMessageStore(ActiveMQTopic destination)
Factory method to create a new topic message store with the given destination nameTransactionStore
createTransactionStore()
Factory method to create a new persistent prepared transaction store for XA recoveryvoid
deleteAllMessages()
Delete's all the messages in the persistent store.void
doStart()
void
doStop(ServiceStopper stopper)
JDBCAdapter
getAdapter()
int
getAuditRecoveryDepth()
BrokerService
getBrokerService()
int
getCleanupPeriod()
Locker
getDatabaseLocker()
Deprecated.as of 5.7.0, replaced byLockableServiceSupport.getLocker()
Set<ActiveMQDestination>
getDestinations()
Returns a set of all theActiveMQDestination
objects that the persistence store is aware exist.File
getDirectory()
long
getLastMessageBrokerSequenceId()
long
getLastProducerSequenceId(ProducerId id)
return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occursDataSource
getLockDataSource()
int
getMaxAuditDepth()
int
getMaxProducersToAudit()
int
getMaxRows()
int
getNetworkTimeout()
long
getNextSequenceId()
int
getQueryTimeout()
ScheduledThreadPoolExecutor
getScheduledThreadPoolExecutor()
Statements
getStatements()
TransactionContext
getTransactionContext()
TransactionContext
getTransactionContext(ConnectionContext context)
WireFormat
getWireFormat()
void
init()
Initialize resources before lockingvoid
initSequenceIdGenerator()
boolean
isChangeAutoCommitAllowed()
boolean
isCreateTablesOnStartup()
boolean
isEnableAudit()
boolean
isUseExternalMessageReferences()
static void
log(String msg, SQLException e)
void
recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore)
void
removeQueueMessageStore(ActiveMQQueue destination)
Cleanup method to remove any state associated with the given destinationvoid
removeTopicMessageStore(ActiveMQTopic destination)
Cleanup method to remove any state associated with the given destination No state retained....void
rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId)
void
rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId)
void
rollbackTransaction(ConnectionContext context)
Rollback a persistence transactionvoid
setAdapter(JDBCAdapter adapter)
void
setAuditRecoveryDepth(int auditRecoveryDepth)
void
setBrokerName(String brokerName)
Set the name of the broker using the adaptervoid
setChangeAutoCommitAllowed(boolean changeAutoCommitAllowed)
Whether the JDBC driver allows to set the auto commit.void
setCleanupPeriod(int cleanupPeriod)
Sets the number of milliseconds until the database is attempted to be cleaned up for durable topicsvoid
setCreateTablesOnStartup(boolean createTablesOnStartup)
Sets whether or not tables are created on startupvoid
setDatabaseLocker(Locker locker)
Deprecated.as of 5.7.0, replaced byLockableServiceSupport.setLocker(org.apache.activemq.broker.Locker)
void
setDirectory(File dir)
Set the directory where any data files should be createdvoid
setEnableAudit(boolean enableAudit)
void
setLockAcquireSleepInterval(long lockAcquireSleepInterval)
Deprecated.useLocker.setLockAcquireSleepInterval(long)
instead millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker not applied if DataBaseLocker is injected.void
setLockDataSource(DataSource dataSource)
void
setMaxAuditDepth(int maxAuditDepth)
void
setMaxProducersToAudit(int maxProducersToAudit)
void
setMaxRows(int maxRows)
void
setNetworkTimeout(int networkTimeout)
Define the JDBC connection network timeout.void
setQueryTimeout(int queryTimeout)
Define the JDBC statement query timeout.void
setStatements(Statements statements)
void
setTransactionIsolation(int transactionIsolation)
set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED This allowable dirty isolation level may not be achievable in clustered DB environments so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ see isolation level constants inConnection
void
setUsageManager(SystemUsage usageManager)
void
setUseDatabaseLock(boolean useDatabaseLock)
Deprecated.useLockableServiceSupport.setUseLock(boolean)
instead Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave.void
setUseExternalMessageReferences(boolean useExternalMessageReferences)
void
setWireFormat(WireFormat wireFormat)
long
size()
A hint to return the size of the store on diskString
toString()
-
Methods inherited from class org.apache.activemq.store.jdbc.DataSourceServiceSupport
closeDataSource, createDataSource, createDataSource, getDataDirectory, getDataDirectoryFile, getDataSource, setDataDirectory, setDataDirectoryFile, setDataSource, shutdownDefaultDataSource
-
Methods inherited from class org.apache.activemq.broker.LockableServiceSupport
getLocker, getLockKeepAlivePeriod, isStopOnError, isUseLock, keepLockAlive, postStop, preStart, setBrokerService, setLocker, setLockKeepAlivePeriod, setScheduledThreadPoolExecutor, setStopOnError, setUseLock, stopBroker
-
Methods inherited from class org.apache.activemq.util.ServiceSupport
addServiceListener, dispose, isStarted, isStopped, isStopping, removeServiceListener, start, stop
-
-
-
-
Field Detail
-
DEFAULT_LOCK_KEEP_ALIVE_PERIOD
public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD
- See Also:
- Constant Field Values
-
maxProducersToAudit
protected int maxProducersToAudit
-
maxAuditDepth
protected int maxAuditDepth
-
enableAudit
protected boolean enableAudit
-
auditRecoveryDepth
protected int auditRecoveryDepth
-
audit
protected ActiveMQMessageAudit audit
-
sequenceGenerator
protected LongSequenceGenerator sequenceGenerator
-
maxRows
protected int maxRows
-
storeCache
protected final HashMap<ActiveMQDestination,MessageStore> storeCache
-
-
Constructor Detail
-
JDBCPersistenceAdapter
public JDBCPersistenceAdapter()
-
JDBCPersistenceAdapter
public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat)
-
-
Method Detail
-
getDestinations
public Set<ActiveMQDestination> getDestinations()
Description copied from interface:PersistenceAdapter
Returns a set of all theActiveMQDestination
objects that the persistence store is aware exist.- Specified by:
getDestinations
in interfacePersistenceAdapter
- Returns:
- active destinations
-
createMessageAudit
protected void createMessageAudit()
-
initSequenceIdGenerator
public void initSequenceIdGenerator()
-
createQueueMessageStore
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException
Description copied from interface:PersistenceAdapter
Factory method to create a new queue message store with the given destination name- Specified by:
createQueueMessageStore
in interfacePersistenceAdapter
- Returns:
- the message store
- Throws:
IOException
-
createTopicMessageStore
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException
Description copied from interface:PersistenceAdapter
Factory method to create a new topic message store with the given destination name- Specified by:
createTopicMessageStore
in interfacePersistenceAdapter
- Returns:
- the topic message store
- Throws:
IOException
-
removeQueueMessageStore
public void removeQueueMessageStore(ActiveMQQueue destination)
Cleanup method to remove any state associated with the given destination- Specified by:
removeQueueMessageStore
in interfacePersistenceAdapter
- Parameters:
destination
- Destination to forget
-
removeTopicMessageStore
public void removeTopicMessageStore(ActiveMQTopic destination)
Cleanup method to remove any state associated with the given destination No state retained.... nothing to do- Specified by:
removeTopicMessageStore
in interfacePersistenceAdapter
- Parameters:
destination
- Destination to forget
-
createTransactionStore
public TransactionStore createTransactionStore() throws IOException
Description copied from interface:PersistenceAdapter
Factory method to create a new persistent prepared transaction store for XA recovery- Specified by:
createTransactionStore
in interfacePersistenceAdapter
- Returns:
- transaction store
- Throws:
IOException
-
getLastMessageBrokerSequenceId
public long getLastMessageBrokerSequenceId() throws IOException
- Specified by:
getLastMessageBrokerSequenceId
in interfacePersistenceAdapter
- Returns:
- last broker sequence
- Throws:
IOException
-
getLastProducerSequenceId
public long getLastProducerSequenceId(ProducerId id) throws IOException
Description copied from interface:PersistenceAdapter
return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs- Specified by:
getLastProducerSequenceId
in interfacePersistenceAdapter
- Parameters:
id
- the producerId to find a sequenceId for- Returns:
- the last stored sequence id or -1 if no suppression needed
- Throws:
IOException
-
allowIOResumption
public void allowIOResumption()
- Specified by:
allowIOResumption
in interfacePersistenceAdapter
-
init
public void init() throws Exception
Description copied from class:LockableServiceSupport
Initialize resources before locking- Specified by:
init
in classLockableServiceSupport
- Throws:
Exception
-
doStart
public void doStart() throws Exception
- Specified by:
doStart
in classServiceSupport
- Throws:
Exception
-
doStop
public void doStop(ServiceStopper stopper) throws Exception
- Specified by:
doStop
in classServiceSupport
- Throws:
Exception
-
cleanup
public void cleanup()
-
getScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor()
- Overrides:
getScheduledThreadPoolExecutor
in classLockableServiceSupport
-
getAdapter
public JDBCAdapter getAdapter() throws IOException
- Throws:
IOException
-
getDatabaseLocker
@Deprecated public Locker getDatabaseLocker() throws IOException
Deprecated.as of 5.7.0, replaced byLockableServiceSupport.getLocker()
- Throws:
IOException
-
setDatabaseLocker
@Deprecated public void setDatabaseLocker(Locker locker) throws IOException
Deprecated.as of 5.7.0, replaced byLockableServiceSupport.setLocker(org.apache.activemq.broker.Locker)
Sets the database locker strategy to use to lock the database on startup- Throws:
IOException
-
getLockDataSource
public DataSource getLockDataSource() throws IOException
- Throws:
IOException
-
setLockDataSource
public void setLockDataSource(DataSource dataSource)
-
getBrokerService
public BrokerService getBrokerService()
- Overrides:
getBrokerService
in classLockableServiceSupport
-
createAdapter
protected JDBCAdapter createAdapter() throws IOException
- Throws:
IOException
-
setAdapter
public void setAdapter(JDBCAdapter adapter)
-
getWireFormat
public WireFormat getWireFormat()
-
setWireFormat
public void setWireFormat(WireFormat wireFormat)
-
getTransactionContext
public TransactionContext getTransactionContext(ConnectionContext context) throws IOException
- Throws:
IOException
-
getTransactionContext
public TransactionContext getTransactionContext() throws IOException
- Throws:
IOException
-
beginTransaction
public void beginTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapter
This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization. Typically one transaction will require one disk synchronization point and so for real high performance its usually faster to perform many writes within the same transaction to minimize latency caused by disk synchronization. This is especially true when using tools like Berkeley Db or embedded JDBC servers.- Specified by:
beginTransaction
in interfacePersistenceAdapter
- Throws:
IOException
-
commitTransaction
public void commitTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapter
Commit a persistence transaction- Specified by:
commitTransaction
in interfacePersistenceAdapter
- Throws:
IOException
- See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)
-
rollbackTransaction
public void rollbackTransaction(ConnectionContext context) throws IOException
Description copied from interface:PersistenceAdapter
Rollback a persistence transaction- Specified by:
rollbackTransaction
in interfacePersistenceAdapter
- Throws:
IOException
- See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)
-
getCleanupPeriod
public int getCleanupPeriod()
-
setCleanupPeriod
public void setCleanupPeriod(int cleanupPeriod)
Sets the number of milliseconds until the database is attempted to be cleaned up for durable topics
-
isChangeAutoCommitAllowed
public boolean isChangeAutoCommitAllowed()
-
setChangeAutoCommitAllowed
public void setChangeAutoCommitAllowed(boolean changeAutoCommitAllowed)
Whether the JDBC driver allows to set the auto commit. Some drivers does not allow changing the auto commit. The default value is true.- Parameters:
changeAutoCommitAllowed
- true to change, false to not change.
-
getNetworkTimeout
public int getNetworkTimeout()
-
setNetworkTimeout
public void setNetworkTimeout(int networkTimeout)
Define the JDBC connection network timeout.- Parameters:
networkTimeout
- the connection network timeout (in milliseconds).
-
getQueryTimeout
public int getQueryTimeout()
-
setQueryTimeout
public void setQueryTimeout(int queryTimeout)
Define the JDBC statement query timeout.- Parameters:
queryTimeout
- the statement query timeout (in seconds).
-
deleteAllMessages
public void deleteAllMessages() throws IOException
Description copied from interface:PersistenceAdapter
Delete's all the messages in the persistent store.- Specified by:
deleteAllMessages
in interfacePersistenceAdapter
- Throws:
IOException
-
isUseExternalMessageReferences
public boolean isUseExternalMessageReferences()
-
setUseExternalMessageReferences
public void setUseExternalMessageReferences(boolean useExternalMessageReferences)
-
isCreateTablesOnStartup
public boolean isCreateTablesOnStartup()
-
setCreateTablesOnStartup
public void setCreateTablesOnStartup(boolean createTablesOnStartup)
Sets whether or not tables are created on startup
-
setUseDatabaseLock
@Deprecated public void setUseDatabaseLock(boolean useDatabaseLock)
Deprecated.useLockableServiceSupport.setUseLock(boolean)
instead Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave. Enabled by default.
-
log
public static void log(String msg, SQLException e)
-
getStatements
public Statements getStatements()
-
setStatements
public void setStatements(Statements statements)
-
setUsageManager
public void setUsageManager(SystemUsage usageManager)
- Specified by:
setUsageManager
in interfacePersistenceAdapter
- Parameters:
usageManager
- The UsageManager that is controlling the destination's memory usage.
-
createDefaultLocker
public Locker createDefaultLocker() throws IOException
Description copied from interface:Lockable
Create a default locker- Specified by:
createDefaultLocker
in interfaceLockable
- Returns:
- default locker
- Throws:
IOException
-
setBrokerName
public void setBrokerName(String brokerName)
Description copied from interface:PersistenceAdapter
Set the name of the broker using the adapter- Specified by:
setBrokerName
in interfacePersistenceAdapter
-
toString
public String toString()
- Overrides:
toString
in classDataSourceServiceSupport
-
setDirectory
public void setDirectory(File dir)
Description copied from interface:PersistenceAdapter
Set the directory where any data files should be created- Specified by:
setDirectory
in interfacePersistenceAdapter
-
getDirectory
public File getDirectory()
- Specified by:
getDirectory
in interfacePersistenceAdapter
- Returns:
- the directory used by the persistence adaptor
-
checkpoint
public void checkpoint(boolean sync) throws IOException
Description copied from interface:PersistenceAdapter
checkpoint any- Specified by:
checkpoint
in interfacePersistenceAdapter
- Throws:
IOException
-
size
public long size()
Description copied from interface:PersistenceAdapter
A hint to return the size of the store on disk- Specified by:
size
in interfacePersistenceAdapter
- Returns:
- disk space used in bytes of 0 if not implemented
-
setLockAcquireSleepInterval
@Deprecated public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException
Deprecated.useLocker.setLockAcquireSleepInterval(long)
instead millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker not applied if DataBaseLocker is injected.- Throws:
IOException
-
setTransactionIsolation
public void setTransactionIsolation(int transactionIsolation)
set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED This allowable dirty isolation level may not be achievable in clustered DB environments so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ see isolation level constants inConnection
- Parameters:
transactionIsolation
- the isolation level to use
-
getMaxProducersToAudit
public int getMaxProducersToAudit()
-
setMaxProducersToAudit
public void setMaxProducersToAudit(int maxProducersToAudit)
-
getMaxAuditDepth
public int getMaxAuditDepth()
-
setMaxAuditDepth
public void setMaxAuditDepth(int maxAuditDepth)
-
isEnableAudit
public boolean isEnableAudit()
-
setEnableAudit
public void setEnableAudit(boolean enableAudit)
-
getAuditRecoveryDepth
public int getAuditRecoveryDepth()
-
setAuditRecoveryDepth
public void setAuditRecoveryDepth(int auditRecoveryDepth)
-
getNextSequenceId
public long getNextSequenceId()
-
getMaxRows
public int getMaxRows()
-
setMaxRows
public void setMaxRows(int maxRows)
-
recover
public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws IOException
- Throws:
IOException
-
commitAdd
public void commitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId, long newSequence) throws IOException
- Throws:
IOException
-
commitRemove
public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException
- Throws:
IOException
-
commitLastAck
public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException
- Throws:
IOException
-
rollbackLastAck
public void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId) throws IOException
- Throws:
IOException
-
rollbackLastAck
public void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId) throws IOException
- Throws:
IOException
-
createJobSchedulerStore
public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException
Description copied from interface:PersistenceAdapter
Creates and returns a new Job Scheduler store instance.- Specified by:
createJobSchedulerStore
in interfacePersistenceAdapter
- Returns:
- a new JobSchedulerStore instance if this Persistence adapter provides its own.
- Throws:
IOException
- If an error occurs while creating the new JobSchedulerStore.UnsupportedOperationException
- If this adapter does not provide its own scheduler store implementation.
-
-