Dispatch Policies

ActiveMQ 4.0 or later supports the configuration of different dispatch policies on a per destination (or wildcard) basis. Before discussing dispatch policies its worth first understanding the purpose of the prefetch value.

The out of the box configuration of ActiveMQ is designed for high performance and high throughput messaging where there are lots of messages that need to be dispatched to consumers as quickly as possible. So the default prefetch values are fairly large and the default dispatch policy will try and fill the prefetch buffers as quickly as possible.

However with messaging there are many use cases and sometimes the default configuration is not ideal to your use case; when you send a small number of messages, they tend to all go to one consumer unless you've lots of messages. If you have a large number of consumers and a relatively high prefetch value and you have a small number of messages that each message takes quite a while to process then the default dispatch policy might result in increasing the amount of time it takes to process all the messages (since the load balancing is not fair for small numbers of messages).

Round Robin Policy

To load balance low numbers of messages more fairly you might want to switch to the <roundRobinDispatchPolicy/> which tries to dispatch messages fairly at the expense of some performance.

Here is an example of this in use.

<beans 
  xmlns="http://www.springframework.org/schema/beans" 
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>

  <broker persistent="false" brokerName="${brokername}" xmlns="http://activemq.apache.org/schema/core">

    <!--  lets define the dispatch policy -->
    <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic="FOO.>">
            <dispatchPolicy>
              <roundRobinDispatchPolicy />
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy />
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry topic="ORDERS.>">
            <dispatchPolicy>
              <strictOrderDispatchPolicy />
            </dispatchPolicy>
            <!--  1 minutes worth -->
            <subscriptionRecoveryPolicy>
              <timedSubscriptionRecoveryPolicy recoverDuration="60000" />
            </subscriptionRecoveryPolicy>
          </policyEntry>

          <policyEntry topic="PRICES.>">

            <!-- lets force old messages to be discarded for slow consumers -->
            <pendingMessageLimitStrategy>
              <constantPendingMessageLimitStrategy limit="10"/>
            </pendingMessageLimitStrategy>

            <!--  10 seconds worth -->
            <subscriptionRecoveryPolicy>
              <timedSubscriptionRecoveryPolicy recoverDuration="10000" />
            </subscriptionRecoveryPolicy>
            
          </policyEntry>
          <policyEntry tempTopic="true" advisoryForConsumed="true" />

          <policyEntry tempQueue="true" advisoryForConsumed="true" />
        </policyEntries>
      </policyMap>
    </destinationPolicy>
  </broker>

</beans>
© 2004-2011 The Apache Software Foundation.
Apache ActiveMQ, ActiveMQ, Apache, the Apache feather logo, and the Apache ActiveMQ project logo are trademarks of The Apache Software Foundation. All other marks mentioned may be trademarks or registered trademarks of their respective owners.
Graphic Design By Hiram