Producer Flow Control

In ActiveMQ 4.x flow control was implemented using TCP flow control. The underlying network connection of throttled consumers was suspended to enforce flow control limits. This strategy is very efficient but can lead to deadlocks if there are multiple producers and consumers sharing the same connection.

As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection. Messages that are sent synchronously will automatically use per producer flow control. For producers that use Async Sends, you will need to configure the ProducerWindowSize connection option so that even async messages are flow controlled per producer.

ActiveMQConnectionFactory connctionFactory = ...
connctionFactory.setProducerWindowSize(1024000);

The ProducerWindowSize is the maximum number of bytes of data that a producer will transmit to a broker before waiting for acknowledgment messages from the broker that it has accepted the previously sent messages. In other words, this how you configure the producer flow control window that is used for async sends.

You can disable flow control by setting the producerFlowControl flag to false on a destination policy in the Broker configuration - e.g.

<destinationPolicy>
      <policyMap>
        <policyEntries>

          <policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="1mb">
            <dispatchPolicy>
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>

        </policyEntries>
      </policyMap>
</destinationPolicy>

see Broker Configuration

How Producer Flow Control works

If you are sending a persistent message (so that a response of the OpenWire Message is expected then the broker will send the producer a ProducerAck message. This informs the producer that the previous sending window has been processed, so that it can now send another window. Its kinda like consumer acks but in reverse.

Advantage

So a nice producer might wait for a producer ack before sending more data, to avoid flooding the broker (and forcing the broker to block the entire connection if a slow consumer occurs). To see how this works in source code, check out the ActiveMQMessageProducer code.

Though a client can ignore the producer ACKs altogether and the broker should just stall the transport if it has to for slow consumer handling; though this does mean it'll stall the entire connection.

Disabling Flow Control

A common requirement is to disable flow control so that message dispatching continues until all available disk is used up by pending messages (whether persistent or non persistent messaging is configured). To do this enable Message Cursors.

System usage

You can also slowdown producers using the <systemUsage> configuration tag. Take a look at the following example:

<systemUsage>
  <systemUsage>
    <memoryUsage>
      <memoryUsage limit="20 mb"/>
    </memoryUsage>
    <storeUsage>
      <storeUsage limit="1 gb" name="foo"/>
    </storeUsage>
    <tempUsage>
      <tempUsage limit="100 mb"/>
    </tempUsage>
  </systemUsage>
</systemUsage>

You can set limits of memory for NON_PERSISTENT messages, disk storage for PERSITENT messages and total usage for temporary messages, the broker will use before it slowdown producers. In this default settings, the broker will block the send() call until some messages are consumed. In case that you set sendFailIfNoSpace property to true, like in this example:

<systemUsage>
 <systemUsage sendFailIfNoSpace="true">
   <memoryUsage>
     <memoryUsage limit="20 mb">
   </memoryUsage>
 </systemUsage>
</systemUsage>

the broker will cause the send() call to fail.

Graphic Design By Hiram