org.apache.activemq.store.amq
Class AMQMessageStore

java.lang.Object
  extended by org.apache.activemq.store.AbstractMessageStore
      extended by org.apache.activemq.store.amq.AMQMessageStore
All Implemented Interfaces:
Service, MessageStore
Direct Known Subclasses:
AMQTopicMessageStore

public class AMQMessageStore
extends AbstractMessageStore

A MessageStore that uses a Journal to store it's messages.


Field Summary
protected  TaskRunner asyncWriteTask
           
protected  CountDownLatch flushLatch
           
protected  Set<Location> inFlightTxLocations
           
protected  Location lastLocation
           
protected  Location lastWrittenLocation
           
protected  Lock lock
           
protected  AMQPersistenceAdapter peristenceAdapter
           
protected  ReferenceStore referenceStore
           
protected  AMQTransactionStore transactionStore
           
protected  TransactionTemplate transactionTemplate
           
 
Fields inherited from class org.apache.activemq.store.AbstractMessageStore
destination, FUTURE, prioritizedMessages
 
Constructor Summary
AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination)
           
 
Method Summary
 void addMessage(ConnectionContext context, Message message)
          Not synchronize since the Journal has better throughput if you increase the number of concurrent writes that it is doing.
 void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef)
           
 void dispose(ConnectionContext context)
           
protected  Location doAsyncWrite()
           
 void flush()
          Waits till the lastest data has landed on the referenceStore
protected  Location getLocation(MessageId messageId)
           
 Location getMark()
           
 Message getMessage(MessageId identity)
          Looks up a message using either the String messageID or the messageNumber.
 int getMessageCount()
           
 String getMessageReference(MessageId identity)
           
 ReferenceStore getReferenceStore()
           
 void recover(MessageRecoveryListener listener)
          Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the transaction log and then the cache is updated.
 void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
           
 void removeAllMessages(ConnectionContext context)
          Removes all the messages from the message store.
 void removeMessage(ConnectionContext context, MessageAck ack)
          Removes a message from the message store.
 boolean replayAddMessage(ConnectionContext context, Message message, Location location)
           
 boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck)
           
 void resetBatching()
          A hint to the Store to reset any batching state for the Destination
 void setBatch(MessageId messageId)
          allow caching cursors to set the current batch offset when cache is exhausted
 void setMemoryUsage(MemoryUsage memoryUsage)
           
 void start()
           
 void stop()
           
 
Methods inherited from class org.apache.activemq.store.AbstractMessageStore
addMessage, asyncAddQueueMessage, asyncAddQueueMessage, asyncAddTopicMessage, asyncAddTopicMessage, getDestination, isEmpty, isPrioritizedMessages, removeAsyncMessage, setPrioritizedMessages
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

peristenceAdapter

protected final AMQPersistenceAdapter peristenceAdapter

transactionStore

protected final AMQTransactionStore transactionStore

referenceStore

protected final ReferenceStore referenceStore

transactionTemplate

protected final TransactionTemplate transactionTemplate

lastLocation

protected Location lastLocation

lastWrittenLocation

protected Location lastWrittenLocation

inFlightTxLocations

protected Set<Location> inFlightTxLocations

asyncWriteTask

protected final TaskRunner asyncWriteTask

flushLatch

protected CountDownLatch flushLatch

lock

protected final Lock lock
Constructor Detail

AMQMessageStore

public AMQMessageStore(AMQPersistenceAdapter adapter,
                       ReferenceStore referenceStore,
                       ActiveMQDestination destination)
Method Detail

setMemoryUsage

public void setMemoryUsage(MemoryUsage memoryUsage)
Specified by:
setMemoryUsage in interface MessageStore
Overrides:
setMemoryUsage in class AbstractMessageStore
Parameters:
memoryUsage - The SystemUsage that is controlling the destination's memory usage.

addMessage

public final void addMessage(ConnectionContext context,
                             Message message)
                      throws IOException
Not synchronize since the Journal has better throughput if you increase the number of concurrent writes that it is doing.

Parameters:
context - context
Throws:
IOException

replayAddMessage

public boolean replayAddMessage(ConnectionContext context,
                                Message message,
                                Location location)

removeMessage

public void removeMessage(ConnectionContext context,
                          MessageAck ack)
                   throws IOException
Description copied from interface: MessageStore
Removes a message from the message store.

ack - the ack request that cause the message to be removed. It conatins the identity which contains the messageID of the message that needs to be removed.
Throws:
IOException

replayRemoveMessage

public boolean replayRemoveMessage(ConnectionContext context,
                                   MessageAck messageAck)

flush

public void flush()
           throws InterruptedIOException
Waits till the lastest data has landed on the referenceStore

Throws:
InterruptedIOException

doAsyncWrite

protected Location doAsyncWrite()
                         throws IOException
Returns:
Throws:
IOException

getMessage

public Message getMessage(MessageId identity)
                   throws IOException
Description copied from interface: MessageStore
Looks up a message using either the String messageID or the messageNumber. Implementations are encouraged to fill in the missing key if its easy to do so.

Parameters:
identity - which contains either the messageID or the messageNumber
Returns:
the message or null if it does not exist
Throws:
IOException

getLocation

protected Location getLocation(MessageId messageId)
                        throws IOException
Throws:
IOException

recover

public void recover(MessageRecoveryListener listener)
             throws Exception
Replays the referenceStore first as those messages are the oldest ones, then messages are replayed from the transaction log and then the cache is updated.

Parameters:
listener -
Throws:
Exception

start

public void start()
           throws Exception
Specified by:
start in interface Service
Overrides:
start in class AbstractMessageStore
Throws:
Exception

stop

public void stop()
          throws Exception
Specified by:
stop in interface Service
Overrides:
stop in class AbstractMessageStore
Throws:
Exception

getReferenceStore

public ReferenceStore getReferenceStore()
Returns:
Returns the longTermStore.

removeAllMessages

public void removeAllMessages(ConnectionContext context)
                       throws IOException
Description copied from interface: MessageStore
Removes all the messages from the message store.

Throws:
IOException
See Also:
MessageStore.removeAllMessages(ConnectionContext)

addMessageReference

public void addMessageReference(ConnectionContext context,
                                MessageId messageId,
                                long expirationTime,
                                String messageRef)
                         throws IOException
Throws:
IOException

getMessageReference

public String getMessageReference(MessageId identity)
                           throws IOException
Throws:
IOException

getMessageCount

public int getMessageCount()
                    throws IOException
Returns:
Throws:
IOException
See Also:
MessageStore.getMessageCount()

recoverNextMessages

public void recoverNextMessages(int maxReturned,
                                MessageRecoveryListener listener)
                         throws Exception
Throws:
Exception

resetBatching

public void resetBatching()
Description copied from interface: MessageStore
A hint to the Store to reset any batching state for the Destination


getMark

public Location getMark()

dispose

public void dispose(ConnectionContext context)
Specified by:
dispose in interface MessageStore
Overrides:
dispose in class AbstractMessageStore

setBatch

public void setBatch(MessageId messageId)
Description copied from interface: MessageStore
allow caching cursors to set the current batch offset when cache is exhausted

Specified by:
setBatch in interface MessageStore
Overrides:
setBatch in class AbstractMessageStore


Copyright © 2005-2012 The Apache Software Foundation. All Rights Reserved.