org.apache.activemq.network
Class DemandForwardingBridgeSupport

java.lang.Object
  extended by org.apache.activemq.network.DemandForwardingBridgeSupport
All Implemented Interfaces:
BrokerServiceAware, NetworkBridge, Service
Direct Known Subclasses:
CompositeDemandForwardingBridge, DemandForwardingBridge

public abstract class DemandForwardingBridgeSupport
extends Object
implements NetworkBridge, BrokerServiceAware

A useful base class for implementing demand forwarding bridges.


Field Summary
protected  Object brokerInfoMutex
           
protected  NetworkBridgeConfiguration configuration
           
protected  LongSequenceGenerator consumerIdGenerator
           
protected  NetworkBridgeFilterFactory defaultFilterFactory
           
protected  int demandConsumerDispatched
           
protected  ConsumerInfo demandConsumerInfo
           
protected  AtomicBoolean disposed
           
protected static String DURABLE_SUB_PREFIX
           
protected  ActiveMQDestination[] durableDestinations
           
protected  ActiveMQDestination[] dynamicallyIncludedDestinations
           
protected  ActiveMQDestination[] excludedDestinations
           
protected  IdGenerator idGenerator
           
protected  AtomicBoolean lastConnectSucceeded
           
protected  AtomicBoolean localBridgeStarted
           
protected  Transport localBroker
           
protected  BrokerId localBrokerId
           
protected  BrokerId[] localBrokerPath
           
protected  String localClientId
           
protected  ConnectionInfo localConnectionInfo
           
protected  SessionInfo localSessionInfo
           
protected  CountDownLatch localStartedLatch
           
protected  ProducerInfo producerInfo
           
protected  AtomicBoolean remoteBridgeStarted
           
protected  Transport remoteBroker
           
protected  BrokerId remoteBrokerId
           
protected  String remoteBrokerName
           
protected  BrokerId[] remoteBrokerPath
           
protected  ConnectionInfo remoteConnectionInfo
           
protected  CountDownLatch startedLatch
           
protected  ActiveMQDestination[] staticallyIncludedDestinations
           
protected  ConcurrentHashMap<ConsumerId,DemandSubscription> subscriptionMapByLocalId
           
protected  ConcurrentHashMap<ConsumerId,DemandSubscription> subscriptionMapByRemoteId
           
 
Constructor Summary
DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker)
           
 
Method Summary
protected  boolean addConsumerInfo(ConsumerInfo consumerInfo)
           
protected  void addRemoteBrokerToBrokerPath(ConsumerInfo info)
           
protected  void addSubscription(DemandSubscription sub)
           
protected  BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend)
           
protected  BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend)
           
protected  void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub)
           
protected  Message configureMessage(MessageDispatch md)
           
static boolean contains(BrokerId[] brokerPath, BrokerId brokerId)
           
protected  DemandSubscription createDemandSubscription(ActiveMQDestination destination)
           
protected  DemandSubscription createDemandSubscription(ConsumerInfo info)
           
protected  NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info)
           
protected  DemandSubscription doCreateDemandSubscription(ConsumerInfo info)
           
 void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo)
           
protected  Service getControllingService()
           
 long getDequeueCounter()
           
 ActiveMQDestination[] getDurableDestinations()
           
 ActiveMQDestination[] getDynamicallyIncludedDestinations()
           
 long getEnqueueCounter()
           
 ActiveMQDestination[] getExcludedDestinations()
           
 String getLocalAddress()
           
 Transport getLocalBroker()
           
 String getLocalBrokerName()
           
 ConcurrentHashMap<ConsumerId,DemandSubscription> getLocalSubscriptionMap()
           
 ObjectName getMbeanObjectName()
           
 String getRemoteAddress()
           
 Transport getRemoteBroker()
           
 String getRemoteBrokerName()
           
protected  BrokerId[] getRemoteBrokerPath()
           
 ActiveMQDestination[] getStaticallyIncludedDestinations()
           
 boolean isCreatedByDuplex()
           
protected  boolean isDuplex()
           
protected  boolean isPermissableDestination(ActiveMQDestination destination)
           
protected  boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary)
           
protected  void removeDemandSubscription(ConsumerId id)
           
protected  boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId)
           
protected  void removeSubscription(DemandSubscription sub)
           
protected  void safeWaitUntilStarted()
          Performs a timed wait on the started latch and then checks for disposed before performing another wait each time the the started wait times out.
protected  void serviceLocalBrokerInfo(Command command)
           
protected  void serviceLocalCommand(Command command)
           
 void serviceLocalException(Throwable error)
          Service an exception received from the Local Broker connection.
protected  void serviceRemoteBrokerInfo(Command command)
           
protected  void serviceRemoteCommand(Command command)
           
 void serviceRemoteException(Throwable error)
          Service an exception received from the Remote Broker connection.
 void setBrokerService(BrokerService brokerService)
           
 void setCreatedByDuplex(boolean createdByDuplex)
           
 void setDurableDestinations(ActiveMQDestination[] durableDestinations)
           
 void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations)
           
 void setExcludedDestinations(ActiveMQDestination[] excludedDestinations)
           
 void setMbeanObjectName(ObjectName objectName)
           
 void setNetworkBridgeListener(NetworkBridgeListener listener)
          Set the NetworkBridgeFailedListener
 void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations)
           
protected  void setupStaticDestinations()
          Subscriptions for these destinations are always created
 void start()
           
protected  void startRemoteBridge()
           
 void stop()
           
protected  void triggerLocalStartBridge()
           
protected  void triggerRemoteStartBridge()
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DURABLE_SUB_PREFIX

protected static final String DURABLE_SUB_PREFIX
See Also:
Constant Field Values

localBroker

protected final Transport localBroker

remoteBroker

protected final Transport remoteBroker

idGenerator

protected final IdGenerator idGenerator

consumerIdGenerator

protected final LongSequenceGenerator consumerIdGenerator

localConnectionInfo

protected ConnectionInfo localConnectionInfo

remoteConnectionInfo

protected ConnectionInfo remoteConnectionInfo

localSessionInfo

protected SessionInfo localSessionInfo

producerInfo

protected ProducerInfo producerInfo

remoteBrokerName

protected String remoteBrokerName

localClientId

protected String localClientId

demandConsumerInfo

protected ConsumerInfo demandConsumerInfo

demandConsumerDispatched

protected int demandConsumerDispatched

localBridgeStarted

protected final AtomicBoolean localBridgeStarted

remoteBridgeStarted

protected final AtomicBoolean remoteBridgeStarted

disposed

protected AtomicBoolean disposed

localBrokerId

protected BrokerId localBrokerId

excludedDestinations

protected ActiveMQDestination[] excludedDestinations

dynamicallyIncludedDestinations

protected ActiveMQDestination[] dynamicallyIncludedDestinations

staticallyIncludedDestinations

protected ActiveMQDestination[] staticallyIncludedDestinations

durableDestinations

protected ActiveMQDestination[] durableDestinations

subscriptionMapByLocalId

protected final ConcurrentHashMap<ConsumerId,DemandSubscription> subscriptionMapByLocalId

subscriptionMapByRemoteId

protected final ConcurrentHashMap<ConsumerId,DemandSubscription> subscriptionMapByRemoteId

localBrokerPath

protected final BrokerId[] localBrokerPath

startedLatch

protected final CountDownLatch startedLatch

localStartedLatch

protected final CountDownLatch localStartedLatch

lastConnectSucceeded

protected final AtomicBoolean lastConnectSucceeded

configuration

protected NetworkBridgeConfiguration configuration

defaultFilterFactory

protected final NetworkBridgeFilterFactory defaultFilterFactory

remoteBrokerPath

protected final BrokerId[] remoteBrokerPath

brokerInfoMutex

protected Object brokerInfoMutex

remoteBrokerId

protected BrokerId remoteBrokerId
Constructor Detail

DemandForwardingBridgeSupport

public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration,
                                     Transport localBroker,
                                     Transport remoteBroker)
Method Detail

duplexStart

public void duplexStart(TransportConnection connection,
                        BrokerInfo localBrokerInfo,
                        BrokerInfo remoteBrokerInfo)
                 throws Exception
Throws:
Exception

start

public void start()
           throws Exception
Specified by:
start in interface Service
Throws:
Exception

triggerLocalStartBridge

protected void triggerLocalStartBridge()
                                throws IOException
Throws:
IOException

triggerRemoteStartBridge

protected void triggerRemoteStartBridge()
                                 throws IOException
Throws:
IOException

startRemoteBridge

protected void startRemoteBridge()
                          throws Exception
Throws:
Exception

stop

public void stop()
          throws Exception
Specified by:
stop in interface Service
Throws:
Exception

serviceRemoteException

public void serviceRemoteException(Throwable error)
Description copied from interface: NetworkBridge
Service an exception received from the Remote Broker connection.

Specified by:
serviceRemoteException in interface NetworkBridge

serviceRemoteCommand

protected void serviceRemoteCommand(Command command)

serviceLocalException

public void serviceLocalException(Throwable error)
Description copied from interface: NetworkBridge
Service an exception received from the Local Broker connection.

Specified by:
serviceLocalException in interface NetworkBridge

getControllingService

protected Service getControllingService()

addSubscription

protected void addSubscription(DemandSubscription sub)
                        throws IOException
Throws:
IOException

removeSubscription

protected void removeSubscription(DemandSubscription sub)
                           throws IOException
Throws:
IOException

configureMessage

protected Message configureMessage(MessageDispatch md)
                            throws IOException
Throws:
IOException

serviceLocalCommand

protected void serviceLocalCommand(Command command)

getDynamicallyIncludedDestinations

public ActiveMQDestination[] getDynamicallyIncludedDestinations()
Returns:
Returns the dynamicallyIncludedDestinations.

setDynamicallyIncludedDestinations

public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations)
Parameters:
dynamicallyIncludedDestinations - The dynamicallyIncludedDestinations to set.

getExcludedDestinations

public ActiveMQDestination[] getExcludedDestinations()
Returns:
Returns the excludedDestinations.

setExcludedDestinations

public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations)
Parameters:
excludedDestinations - The excludedDestinations to set.

getStaticallyIncludedDestinations

public ActiveMQDestination[] getStaticallyIncludedDestinations()
Returns:
Returns the staticallyIncludedDestinations.

setStaticallyIncludedDestinations

public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations)
Parameters:
staticallyIncludedDestinations - The staticallyIncludedDestinations to set.

getDurableDestinations

public ActiveMQDestination[] getDurableDestinations()
Returns:
Returns the durableDestinations.

setDurableDestinations

public void setDurableDestinations(ActiveMQDestination[] durableDestinations)
Parameters:
durableDestinations - The durableDestinations to set.

getLocalBroker

public Transport getLocalBroker()
Returns:
Returns the localBroker.

getRemoteBroker

public Transport getRemoteBroker()
Returns:
Returns the remoteBroker.

isCreatedByDuplex

public boolean isCreatedByDuplex()
Returns:
the createdByDuplex

setCreatedByDuplex

public void setCreatedByDuplex(boolean createdByDuplex)
Parameters:
createdByDuplex - the createdByDuplex to set

contains

public static boolean contains(BrokerId[] brokerPath,
                               BrokerId brokerId)

appendToBrokerPath

protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath,
                                        BrokerId[] pathsToAppend)

appendToBrokerPath

protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath,
                                        BrokerId idToAppend)

isPermissableDestination

protected boolean isPermissableDestination(ActiveMQDestination destination)

isPermissableDestination

protected boolean isPermissableDestination(ActiveMQDestination destination,
                                           boolean allowTemporary)

setupStaticDestinations

protected void setupStaticDestinations()
Subscriptions for these destinations are always created


addConsumerInfo

protected boolean addConsumerInfo(ConsumerInfo consumerInfo)
                           throws IOException
Throws:
IOException

createDemandSubscription

protected DemandSubscription createDemandSubscription(ConsumerInfo info)
                                               throws IOException
Throws:
IOException

doCreateDemandSubscription

protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info)
                                                 throws IOException
Throws:
IOException

createDemandSubscription

protected final DemandSubscription createDemandSubscription(ActiveMQDestination destination)

configureDemandSubscription

protected void configureDemandSubscription(ConsumerInfo info,
                                           DemandSubscription sub)
                                    throws IOException
Throws:
IOException

removeDemandSubscription

protected void removeDemandSubscription(ConsumerId id)
                                 throws IOException
Throws:
IOException

removeDemandSubscriptionByLocalId

protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId)

safeWaitUntilStarted

protected void safeWaitUntilStarted()
                             throws InterruptedException
Performs a timed wait on the started latch and then checks for disposed before performing another wait each time the the started wait times out.

Throws:
InterruptedException

createNetworkBridgeFilter

protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info)
                                                 throws IOException
Throws:
IOException

serviceLocalBrokerInfo

protected void serviceLocalBrokerInfo(Command command)
                               throws InterruptedException
Throws:
InterruptedException

addRemoteBrokerToBrokerPath

protected void addRemoteBrokerToBrokerPath(ConsumerInfo info)
                                    throws IOException
Throws:
IOException

serviceRemoteBrokerInfo

protected void serviceRemoteBrokerInfo(Command command)
                                throws IOException
Throws:
IOException

getRemoteBrokerPath

protected BrokerId[] getRemoteBrokerPath()

setNetworkBridgeListener

public void setNetworkBridgeListener(NetworkBridgeListener listener)
Description copied from interface: NetworkBridge
Set the NetworkBridgeFailedListener

Specified by:
setNetworkBridgeListener in interface NetworkBridge

getRemoteAddress

public String getRemoteAddress()
Specified by:
getRemoteAddress in interface NetworkBridge
Returns:
the network address of the remote broker connection.

getLocalAddress

public String getLocalAddress()
Specified by:
getLocalAddress in interface NetworkBridge
Returns:
the network address of the local broker connection.

getRemoteBrokerName

public String getRemoteBrokerName()
Specified by:
getRemoteBrokerName in interface NetworkBridge
Returns:
the name of the remote broker this bridge is connected to.

getLocalBrokerName

public String getLocalBrokerName()
Specified by:
getLocalBrokerName in interface NetworkBridge
Returns:
the name of the local broker this bridge is connected to.

getDequeueCounter

public long getDequeueCounter()
Specified by:
getDequeueCounter in interface NetworkBridge
Returns:
the current number of dequeues this bridge has.

getEnqueueCounter

public long getEnqueueCounter()
Specified by:
getEnqueueCounter in interface NetworkBridge
Returns:
the current number of enqueues this bridge has.

isDuplex

protected boolean isDuplex()

getLocalSubscriptionMap

public ConcurrentHashMap<ConsumerId,DemandSubscription> getLocalSubscriptionMap()

setBrokerService

public void setBrokerService(BrokerService brokerService)
Specified by:
setBrokerService in interface BrokerServiceAware

setMbeanObjectName

public void setMbeanObjectName(ObjectName objectName)
Specified by:
setMbeanObjectName in interface NetworkBridge
Parameters:
objectName - The ObjectName assigned to this bridge in the MBean server.

getMbeanObjectName

public ObjectName getMbeanObjectName()
Specified by:
getMbeanObjectName in interface NetworkBridge
Returns:
the MBean name used to identify this bridge in the MBean server.


Copyright © 2005-2012 The Apache Software Foundation. All Rights Reserved.