org.apache.activemq.store.jdbc
Class JDBCPersistenceAdapter

java.lang.Object
  extended by org.apache.activemq.util.ServiceSupport
      extended by org.apache.activemq.broker.LockableServiceSupport
          extended by org.apache.activemq.store.jdbc.DataSourceServiceSupport
              extended by org.apache.activemq.store.jdbc.JDBCPersistenceAdapter
All Implemented Interfaces:
BrokerServiceAware, Lockable, Service, PersistenceAdapter

public class JDBCPersistenceAdapter
extends DataSourceServiceSupport
implements PersistenceAdapter

A PersistenceAdapter implementation using JDBC for persistence storage. This persistence adapter will correctly remember prepared XA transactions, but it will not keep track of local transaction commits so that operations performed against the Message store are done as a single uow.


Field Summary
protected  ActiveMQMessageAudit audit
           
protected  int auditRecoveryDepth
           
static long DEFAULT_LOCK_KEEP_ALIVE_PERIOD
           
protected  boolean enableAudit
           
protected  int maxAuditDepth
           
protected  int maxProducersToAudit
           
protected  int maxRows
           
protected  LongSequenceGenerator sequenceGenerator
           
 
Fields inherited from class org.apache.activemq.broker.LockableServiceSupport
brokerService
 
Constructor Summary
JDBCPersistenceAdapter()
           
JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat)
           
 
Method Summary
 void beginTransaction(ConnectionContext context)
          This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.
 void checkpoint(boolean sync)
          checkpoint any
 void cleanup()
           
 void commitAdd(ConnectionContext context, MessageId messageId)
           
 void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId)
           
 void commitRemove(ConnectionContext context, MessageAck ack)
           
 void commitTransaction(ConnectionContext context)
          Commit a persistence transaction
protected  JDBCAdapter createAdapter()
           
 Locker createDefaultLocker()
          Create a default locker
protected  void createMessageAudit()
           
 MessageStore createQueueMessageStore(ActiveMQQueue destination)
          Factory method to create a new queue message store with the given destination name
 TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
          Factory method to create a new topic message store with the given destination name
 TransactionStore createTransactionStore()
          Factory method to create a new persistent prepared transaction store for XA recovery
 void deleteAllMessages()
          Delete's all the messages in the persistent store.
 void doStart()
           
 void doStop(ServiceStopper stopper)
           
 JDBCAdapter getAdapter()
           
 int getAuditRecoveryDepth()
           
 BrokerService getBrokerService()
           
 int getCleanupPeriod()
           
 Locker getDatabaseLocker()
          Deprecated. as of 5.7.0, replaced by LockableServiceSupport.getLocker()
 Set<ActiveMQDestination> getDestinations()
          Returns a set of all the ActiveMQDestination objects that the persistence store is aware exist.
 File getDirectory()
           
 long getLastMessageBrokerSequenceId()
           
 long getLastProducerSequenceId(ProducerId id)
          return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs
 DataSource getLockDataSource()
           
 int getMaxAuditDepth()
           
 int getMaxProducersToAudit()
           
 int getMaxRows()
           
 long getNextSequenceId()
           
 ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor()
           
 Statements getStatements()
           
 TransactionContext getTransactionContext()
           
 TransactionContext getTransactionContext(ConnectionContext context)
           
 WireFormat getWireFormat()
           
 void init()
          Initialize resources before locking
 void initSequenceIdGenerator()
           
 boolean isChangeAutoCommitAllowed()
           
 boolean isCreateTablesOnStartup()
           
 boolean isEnableAudit()
           
 boolean isUseExternalMessageReferences()
           
static void log(String msg, SQLException e)
           
 void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore)
           
 void removeQueueMessageStore(ActiveMQQueue destination)
          Cleanup method to remove any state associated with the given destination
 void removeTopicMessageStore(ActiveMQTopic destination)
          Cleanup method to remove any state associated with the given destination No state retained....
 void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId)
           
 void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId)
           
 void rollbackTransaction(ConnectionContext context)
          Rollback a persistence transaction
 void setAdapter(JDBCAdapter adapter)
           
 void setAuditRecoveryDepth(int auditRecoveryDepth)
           
 void setBrokerName(String brokerName)
          Set the name of the broker using the adapter
 void setChangeAutoCommitAllowed(boolean changeAutoCommitAllowed)
          Whether the JDBC driver allows to set the auto commit.
 void setCleanupPeriod(int cleanupPeriod)
          Sets the number of milliseconds until the database is attempted to be cleaned up for durable topics
 void setCreateTablesOnStartup(boolean createTablesOnStartup)
          Sets whether or not tables are created on startup
 void setDatabaseLocker(Locker locker)
          Deprecated. as of 5.7.0, replaced by LockableServiceSupport.setLocker(org.apache.activemq.broker.Locker)
 void setDirectory(File dir)
          Set the directory where any data files should be created
 void setEnableAudit(boolean enableAudit)
           
 void setLockAcquireSleepInterval(long lockAcquireSleepInterval)
          Deprecated. use Locker.setLockAcquireSleepInterval(long) instead millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker not applied if DataBaseLocker is injected.
 void setLockDataSource(DataSource dataSource)
           
 void setMaxAuditDepth(int maxAuditDepth)
           
 void setMaxProducersToAudit(int maxProducersToAudit)
           
 void setMaxRows(int maxRows)
           
 void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon)
           
 void setStatements(Statements statements)
           
 void setTransactionIsolation(int transactionIsolation)
          set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED This allowable dirty isolation level may not be achievable in clustered DB environments so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ see isolation level constants in Connection
 void setUsageManager(SystemUsage usageManager)
           
 void setUseDatabaseLock(boolean useDatabaseLock)
          Deprecated. use LockableServiceSupport.setUseLock(boolean) instead Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave. Enabled by default.
 void setUseExternalMessageReferences(boolean useExternalMessageReferences)
           
 void setWireFormat(WireFormat wireFormat)
           
 long size()
          A hint to return the size of the store on disk
 String toString()
           
 
Methods inherited from class org.apache.activemq.store.jdbc.DataSourceServiceSupport
createDataSource, getDataDirectory, getDataDirectoryFile, getDataSource, setDataDirectory, setDataDirectoryFile, setDataSource
 
Methods inherited from class org.apache.activemq.broker.LockableServiceSupport
getLocker, keepLockAlive, postStop, preStart, setBrokerService, setLocker, setLockKeepAlivePeriod, setUseLock, stopBroker
 
Methods inherited from class org.apache.activemq.util.ServiceSupport
addServiceListener, dispose, isStarted, isStopped, isStopping, removeServiceListener, start, stop
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface org.apache.activemq.Service
start, stop
 

Field Detail

DEFAULT_LOCK_KEEP_ALIVE_PERIOD

public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD
See Also:
Constant Field Values

maxProducersToAudit

protected int maxProducersToAudit

maxAuditDepth

protected int maxAuditDepth

enableAudit

protected boolean enableAudit

auditRecoveryDepth

protected int auditRecoveryDepth

audit

protected ActiveMQMessageAudit audit

sequenceGenerator

protected LongSequenceGenerator sequenceGenerator

maxRows

protected int maxRows
Constructor Detail

JDBCPersistenceAdapter

public JDBCPersistenceAdapter()

JDBCPersistenceAdapter

public JDBCPersistenceAdapter(DataSource ds,
                              WireFormat wireFormat)
Method Detail

getDestinations

public Set<ActiveMQDestination> getDestinations()
Description copied from interface: PersistenceAdapter
Returns a set of all the ActiveMQDestination objects that the persistence store is aware exist.

Specified by:
getDestinations in interface PersistenceAdapter
Returns:
active destinations

createMessageAudit

protected void createMessageAudit()

initSequenceIdGenerator

public void initSequenceIdGenerator()

createQueueMessageStore

public MessageStore createQueueMessageStore(ActiveMQQueue destination)
                                     throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new queue message store with the given destination name

Specified by:
createQueueMessageStore in interface PersistenceAdapter
Returns:
the message store
Throws:
IOException

createTopicMessageStore

public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination)
                                          throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new topic message store with the given destination name

Specified by:
createTopicMessageStore in interface PersistenceAdapter
Returns:
the topic message store
Throws:
IOException

removeQueueMessageStore

public void removeQueueMessageStore(ActiveMQQueue destination)
Cleanup method to remove any state associated with the given destination

Specified by:
removeQueueMessageStore in interface PersistenceAdapter
Parameters:
destination - Destination to forget

removeTopicMessageStore

public void removeTopicMessageStore(ActiveMQTopic destination)
Cleanup method to remove any state associated with the given destination No state retained.... nothing to do

Specified by:
removeTopicMessageStore in interface PersistenceAdapter
Parameters:
destination - Destination to forget

createTransactionStore

public TransactionStore createTransactionStore()
                                        throws IOException
Description copied from interface: PersistenceAdapter
Factory method to create a new persistent prepared transaction store for XA recovery

Specified by:
createTransactionStore in interface PersistenceAdapter
Returns:
transaction store
Throws:
IOException

getLastMessageBrokerSequenceId

public long getLastMessageBrokerSequenceId()
                                    throws IOException
Specified by:
getLastMessageBrokerSequenceId in interface PersistenceAdapter
Returns:
last broker sequence
Throws:
IOException

getLastProducerSequenceId

public long getLastProducerSequenceId(ProducerId id)
                               throws IOException
Description copied from interface: PersistenceAdapter
return the last stored producer sequenceId for this producer Id used to suppress duplicate sends on failover reconnect at the transport when a reconnect occurs

Specified by:
getLastProducerSequenceId in interface PersistenceAdapter
Parameters:
id - the producerId to find a sequenceId for
Returns:
the last stored sequence id or -1 if no suppression needed
Throws:
IOException

init

public void init()
          throws Exception
Description copied from class: LockableServiceSupport
Initialize resources before locking

Specified by:
init in class LockableServiceSupport
Throws:
Exception

doStart

public void doStart()
             throws Exception
Specified by:
doStart in class ServiceSupport
Throws:
Exception

doStop

public void doStop(ServiceStopper stopper)
            throws Exception
Specified by:
doStop in class ServiceSupport
Throws:
Exception

cleanup

public void cleanup()

setScheduledThreadPoolExecutor

public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon)

getScheduledThreadPoolExecutor

public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor()
Overrides:
getScheduledThreadPoolExecutor in class LockableServiceSupport

getAdapter

public JDBCAdapter getAdapter()
                       throws IOException
Throws:
IOException

getDatabaseLocker

@Deprecated
public Locker getDatabaseLocker()
                         throws IOException
Deprecated. as of 5.7.0, replaced by LockableServiceSupport.getLocker()

Throws:
IOException

setDatabaseLocker

public void setDatabaseLocker(Locker locker)
                       throws IOException
Deprecated. as of 5.7.0, replaced by LockableServiceSupport.setLocker(org.apache.activemq.broker.Locker)

Sets the database locker strategy to use to lock the database on startup

Throws:
IOException

getLockDataSource

public DataSource getLockDataSource()
                             throws IOException
Throws:
IOException

setLockDataSource

public void setLockDataSource(DataSource dataSource)

getBrokerService

public BrokerService getBrokerService()

createAdapter

protected JDBCAdapter createAdapter()
                             throws IOException
Throws:
IOException

setAdapter

public void setAdapter(JDBCAdapter adapter)

getWireFormat

public WireFormat getWireFormat()

setWireFormat

public void setWireFormat(WireFormat wireFormat)

getTransactionContext

public TransactionContext getTransactionContext(ConnectionContext context)
                                         throws IOException
Throws:
IOException

getTransactionContext

public TransactionContext getTransactionContext()
                                         throws IOException
Throws:
IOException

beginTransaction

public void beginTransaction(ConnectionContext context)
                      throws IOException
Description copied from interface: PersistenceAdapter
This method starts a transaction on the persistent storage - which is nothing to do with JMS or XA transactions - its purely a mechanism to perform multiple writes to a persistent store in 1 transaction as a performance optimization.

Typically one transaction will require one disk synchronization point and so for real high performance its usually faster to perform many writes within the same transaction to minimize latency caused by disk synchronization. This is especially true when using tools like Berkeley Db or embedded JDBC servers.

Specified by:
beginTransaction in interface PersistenceAdapter
Throws:
IOException

commitTransaction

public void commitTransaction(ConnectionContext context)
                       throws IOException
Description copied from interface: PersistenceAdapter
Commit a persistence transaction

Specified by:
commitTransaction in interface PersistenceAdapter
Throws:
IOException
See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)

rollbackTransaction

public void rollbackTransaction(ConnectionContext context)
                         throws IOException
Description copied from interface: PersistenceAdapter
Rollback a persistence transaction

Specified by:
rollbackTransaction in interface PersistenceAdapter
Throws:
IOException
See Also:
PersistenceAdapter.beginTransaction(ConnectionContext context)

getCleanupPeriod

public int getCleanupPeriod()

setCleanupPeriod

public void setCleanupPeriod(int cleanupPeriod)
Sets the number of milliseconds until the database is attempted to be cleaned up for durable topics


isChangeAutoCommitAllowed

public boolean isChangeAutoCommitAllowed()

setChangeAutoCommitAllowed

public void setChangeAutoCommitAllowed(boolean changeAutoCommitAllowed)
Whether the JDBC driver allows to set the auto commit. Some drivers does not allow changing the auto commit. The default value is true.

Parameters:
changeAutoCommitAllowed - true to change, false to not change.

deleteAllMessages

public void deleteAllMessages()
                       throws IOException
Description copied from interface: PersistenceAdapter
Delete's all the messages in the persistent store.

Specified by:
deleteAllMessages in interface PersistenceAdapter
Throws:
IOException

isUseExternalMessageReferences

public boolean isUseExternalMessageReferences()

setUseExternalMessageReferences

public void setUseExternalMessageReferences(boolean useExternalMessageReferences)

isCreateTablesOnStartup

public boolean isCreateTablesOnStartup()

setCreateTablesOnStartup

public void setCreateTablesOnStartup(boolean createTablesOnStartup)
Sets whether or not tables are created on startup


setUseDatabaseLock

@Deprecated
public void setUseDatabaseLock(boolean useDatabaseLock)
Deprecated. use LockableServiceSupport.setUseLock(boolean) instead Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave. Enabled by default.


log

public static void log(String msg,
                       SQLException e)

getStatements

public Statements getStatements()

setStatements

public void setStatements(Statements statements)

setUsageManager

public void setUsageManager(SystemUsage usageManager)
Specified by:
setUsageManager in interface PersistenceAdapter
Parameters:
usageManager - The UsageManager that is controlling the destination's memory usage.

createDefaultLocker

public Locker createDefaultLocker()
                           throws IOException
Description copied from interface: Lockable
Create a default locker

Specified by:
createDefaultLocker in interface Lockable
Returns:
default locker
Throws:
IOException

setBrokerName

public void setBrokerName(String brokerName)
Description copied from interface: PersistenceAdapter
Set the name of the broker using the adapter

Specified by:
setBrokerName in interface PersistenceAdapter

toString

public String toString()
Overrides:
toString in class DataSourceServiceSupport

setDirectory

public void setDirectory(File dir)
Description copied from interface: PersistenceAdapter
Set the directory where any data files should be created

Specified by:
setDirectory in interface PersistenceAdapter

getDirectory

public File getDirectory()
Specified by:
getDirectory in interface PersistenceAdapter
Returns:
the directory used by the persistence adaptor

checkpoint

public void checkpoint(boolean sync)
                throws IOException
Description copied from interface: PersistenceAdapter
checkpoint any

Specified by:
checkpoint in interface PersistenceAdapter
Throws:
IOException

size

public long size()
Description copied from interface: PersistenceAdapter
A hint to return the size of the store on disk

Specified by:
size in interface PersistenceAdapter
Returns:
disk space used in bytes of 0 if not implemented

setLockAcquireSleepInterval

public void setLockAcquireSleepInterval(long lockAcquireSleepInterval)
                                 throws IOException
Deprecated. use Locker.setLockAcquireSleepInterval(long) instead millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker not applied if DataBaseLocker is injected.

Throws:
IOException

setTransactionIsolation

public void setTransactionIsolation(int transactionIsolation)
set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED This allowable dirty isolation level may not be achievable in clustered DB environments so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ see isolation level constants in Connection

Parameters:
transactionIsolation - the isolation level to use

getMaxProducersToAudit

public int getMaxProducersToAudit()

setMaxProducersToAudit

public void setMaxProducersToAudit(int maxProducersToAudit)

getMaxAuditDepth

public int getMaxAuditDepth()

setMaxAuditDepth

public void setMaxAuditDepth(int maxAuditDepth)

isEnableAudit

public boolean isEnableAudit()

setEnableAudit

public void setEnableAudit(boolean enableAudit)

getAuditRecoveryDepth

public int getAuditRecoveryDepth()

setAuditRecoveryDepth

public void setAuditRecoveryDepth(int auditRecoveryDepth)

getNextSequenceId

public long getNextSequenceId()

getMaxRows

public int getMaxRows()

setMaxRows

public void setMaxRows(int maxRows)

recover

public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore)
             throws IOException
Throws:
IOException

commitAdd

public void commitAdd(ConnectionContext context,
                      MessageId messageId)
               throws IOException
Throws:
IOException

commitRemove

public void commitRemove(ConnectionContext context,
                         MessageAck ack)
                  throws IOException
Throws:
IOException

commitLastAck

public void commitLastAck(ConnectionContext context,
                          long xidLastAck,
                          long priority,
                          ActiveMQDestination destination,
                          String subName,
                          String clientId)
                   throws IOException
Throws:
IOException

rollbackLastAck

public void rollbackLastAck(ConnectionContext context,
                            JDBCTopicMessageStore store,
                            MessageAck ack,
                            String subName,
                            String clientId)
                     throws IOException
Throws:
IOException

rollbackLastAck

public void rollbackLastAck(ConnectionContext context,
                            byte priority,
                            ActiveMQDestination destination,
                            String subName,
                            String clientId)
                     throws IOException
Throws:
IOException


Copyright © 2005-2013 The Apache Software Foundation. All Rights Reserved.