Transactional ClientCamel recommends supporting the Transactional Client
Transaction Oriented Endpoints (Camel Toes) like JMS support using a transaction for both inbound and outbound message exchanges. Endpoints that support transactions will participate in the current transaction context that they are called from. You should use the SpringRouteBuilder For inbound endpoint to be transacted, they normally need to be configured to use a Spring PlatformTransactionManager You first define needed object in the spring configuration. <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> Then you look them up and use them to create the JmsComponent. PlatformTransactionManager transactionManager = (PlatformTransactionManager) spring.getBean("jmsTransactionManager"); ConnectionFactory connectionFactory = (ConnectionFactory) spring.getBean("jmsConnectionFactory"); JmsComponent component = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager); component.getConfiguration().setConcurrentConsumers(1); ctx.addComponent("activemq", component); Transaction PoliciesOutbound endpoints will automatically enlist in the current transaction context. But what if you do not want your outbound endpoint to enlist in the same transaction as your inbound endpoint? The solution is to add a Transaction Policy to the processing route. You first have to define transaction policies that you will be using. The policies use a spring TransactionTemplate <bean id="PROPAGATION_REQUIRED" class="org.springframework.transaction.support.TransactionTemplate"> <property name="transactionManager" ref="jmsTransactionManager"/> </bean> <bean id="PROPAGATION_NOT_SUPPORTED" class="org.springframework.transaction.support.TransactionTemplate"> <property name="transactionManager" ref="jmsTransactionManager"/> <property name="propagationBehaviorName" value="PROPAGATION_NOT_SUPPORTED"/> </bean> <bean id="PROPAGATION_REQUIRES_NEW" class="org.springframework.transaction.support.TransactionTemplate"> <property name="transactionManager" ref="jmsTransactionManager"/> <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/> </bean> Then in your SpringRouteBuilder public void configure() { ... Policy requried = new SpringTransactionPolicy(bean(TransactionTemplate.class, "PROPAGATION_REQUIRED")); Policy notsupported = new SpringTransactionPolicy(bean(TransactionTemplate.class, "PROPAGATION_NOT_SUPPORTED")); Policy requirenew = new SpringTransactionPolicy(bean(TransactionTemplate.class, "PROPAGATION_REQUIRES_NEW")); ... } Once created, you can use the Policy objects in your processing routes: // Send to bar in a new transaction from("activemq:queue:foo").policy(requirenew).to("activemq:queue:bar"); // Send to bar without a transaction. from("activemq:queue:foo").policy(notsupported ).to("activemq:queue:bar"); Transaction Policies improvements in Camel 1.4In Camel 1.4 we have eased the syntax to setup the transaction polices directly on the SpringTransactionPolicy object: <bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> <property name="transactionManager" ref="jmsTransactionManager"/> </bean> <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> <property name="transactionManager" ref="jmsTransactionManager"/> <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/> </bean> And the Java DSL is a bit simpler now: Policy requried = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRED"));
Database SampleIn this sample we want to ensure that two endpoints is under transaction control. These two endpoints inserts data into a database. First of all we setup the usual spring stuff in its configuration file. Here we have defined a DataSource to the HSQLDB and a most importantly We use the required transaction policy that we define as the PROPOGATION_REQUIRED spring bean. And as last we have our book service bean that does the business logic <!-- datasource to the database --> <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> <property name="driverClassName" value="org.hsqldb.jdbcDriver"/> <property name="url" value="jdbc:hsqldb:mem:camel"/> <property name="username" value="sa"/> <property name="password" value=""/> </bean> <!-- spring transaction manager --> <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource"/> </bean> <!-- policy for required transaction used in our Camel routes --> <bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> <property name="transactionManager" ref="txManager"/> </bean> <!-- bean for book business logic --> <bean id="bookService" class="org.apache.camel.spring.interceptor.BookService"> <property name="dataSource" ref="dataSource"/> </bean> In our Camel route that is Java DSL based we setup the transactional policy, wrapped as a Policy. // Notice that we use the SpringRouteBuilder that has a few more features than // the standard RouteBuilder return new SpringRouteBuilder() { public void configure() throws Exception { // setup the transaction policy SpringTransactionPolicy required = context.getRegistry() .lookup("PROPAGATION_REQUIRED", SpringTransactionPolicy.class); // use this error handler instead of DeadLetterChannel that is the default // Notice: transactionErrorHandler is in SpringRouteBuilder if (useTransactionErrorHandler) { // useTransactionErrorHandler is only used for unit testing to reuse code // for doing a 2nd test without this transaction error handler, so ignore // this. For spring based transaction, end users is encured to use the // transaction error handler instead of the default DeadLetterChannel. errorHandler(transactionErrorHandler(required). // notice that the builder has builder methods for chained configuration maximumRedeliveries(3). initialRedeliveryDelay(5 * 1000L)); } Then we are ready to define our Camel routes. We have two routes: 1 for success conditions, and 1 for a forced rollback condition. // set the required policy for this route from("direct:okay").policy(required). setBody(constant("Tiger in Action")).beanRef("bookService"). setBody(constant("Elephant in Action")).beanRef("bookService"); // set the required policy for this route from("direct:fail").policy(required). setBody(constant("Tiger in Action")).beanRef("bookService"). setBody(constant("Donkey in Action")).beanRef("bookService"); As its a unit test we need to setup the database and this is easily done with Spring JdbcTemplate // create database and insert dummy data final DataSource ds = getMandatoryBean(DataSource.class, "dataSource"); jdbc = new JdbcTemplate(ds); jdbc.execute("create table books (title varchar(50))"); jdbc.update("insert into books (title) values (?)", new Object[] {"Camel in Action"}); And our core business service, the book service, will accept any books except the Donkeys. public class BookService { private SimpleJdbcTemplate jdbc; public BookService() { } public void setDataSource(DataSource ds) { jdbc = new SimpleJdbcTemplate(ds); } public void orderBook(String title) throws Exception { if (title.startsWith("Donkey")) { throw new IllegalArgumentException("We don't have Donkeys, only Camels"); } // create new local datasource to store in DB jdbc.update("insert into books (title) values (?)", title); } } Then we are ready to fire the tests. First to commit condition: public void testTransactionSuccess() throws Exception { template.sendBody("direct:okay", "Hello World"); int count = jdbc.queryForInt("select count(*) from books"); assertEquals("Number of books", 3, count); } And lastly the rollback condition since the 2nd book is a Donkey book: public void testTransactionRollback() throws Exception { template.sendBody("direct:fail", "Hello World"); int count = jdbc.queryForInt("select count(*) from books"); assertEquals("Number of books", 1, count); } JMS SampleIn this sample we want to listen for messages on a queue and process the messages with our business logic java code and send them along. Since its based on a unit test This time we want to setup the camel context and routes using the Spring XML syntax. <!-- here we define our camel context --> <camel:camelContext id="myroutes"> <!-- and now our route using the XML syntax --> <camel:route> <!-- 1: from the jms queue --> <camel:from uri="activemq:queue:okay"/> <!-- 2: setup the transactional boundaries to require a transaction --> <camel:policy ref="PROPAGATION_REQUIRED"/> <!-- 3: call our business logic that is myProcessor --> <camel:process ref="myProcessor"/> <!-- 4: if success then send it to the mock --> <camel:to uri="mock:result"/> </camel:route> </camel:camelContext> <!-- this bean is our business logic --> <bean id="myProcessor" class="org.apache.camel.component.jms.tx.JMSTransactionalClientTest$MyProcessor"/> Since the rest is standard XML stuff its nothing fancy now for the reader: <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost?broker.persistent=false&broker.useJmx=false"/> </bean> <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory" ref="jmsConnectionFactory"/> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="jmsConnectionFactory"/> <property name="transactionManager" ref="jmsTransactionManager"/> <property name="transacted" value="true"/> <property name="concurrentConsumers" value="1"/> </bean> <bean id="activemq" class="org.apache.camel.component.jms.JmsComponent"> <property name="configuration" ref="jmsConfig"/> </bean> <bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> <constructor-arg> <bean class="org.springframework.transaction.support.TransactionTemplate"> <property name="transactionManager" ref="jmsTransactionManager"/> </bean> </constructor-arg> </bean> Our business logic is set to handle the incomming messages and fail the first two times. When its a success it responds with a Bye World message. public static class MyProcessor implements Processor { private int count; public void process(Exchange exchange) throws Exception { if (++count <= 2) { throw new IllegalArgumentException("Forced Exception number " + count + ", please retry"); } exchange.getIn().setBody("Bye World"); exchange.getIn().setHeader("count", count); } } And our unit test is tested with this java code. Notice that we expect the Bye World message to be delivered at the 3rd attempt. MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); mock.expectedBodiesReceived("Bye World"); // success at 3rd attempt mock.message(0).header("count").isEqualTo(3); template.sendBody("activemq:queue:okay", "Hello World"); mock.assertIsSatisfied(); Spring based configurationIn Camel 1.4 we have introduced the concept of configuration of the error handlers using spring XML configuration. The sample below demonstrates that you can configure transaction error handlers in Spring XML as spring beans. These can then be set as global, per route based or per policy based error handler. The latter has been demonstrated in the samples above. This sample is the database sample configured in Spring XML. Notice that we have defnined two error handler, one per route. The first route uses the transaction error handler, and the 2nd uses no error handler at all. <!-- here we define our camel context --> <camel:camelContext id="myroutes"> <!-- first route with transaction error handler --> <!-- here we refer to our transaction error handler we define in this Spring XML file --> <!-- in this route the transactionErrorHandler is used --> <camel:route errorHandlerRef="transactionErrorHandler"> <!-- 1: from the jms queue --> <camel:from uri="activemq:queue:okay"/> <!-- 2: setup the transactional boundaries to require a transaction --> <camel:policy ref="required"/> <!-- 3: call our business logic that is myProcessor --> <camel:process ref="myProcessor"/> <!-- 4: if success then send it to the mock --> <camel:to uri="mock:result"/> </camel:route> <!-- 2nd route with no error handling --> <!-- this route doens't use error handler, in fact the spring bean with id noErrorHandler --> <camel:route errorHandlerRef="noErrorHandler"> <camel:from uri="activemq:queue:bad"/> <camel:to uri="log:bad"/> </camel:route> </camel:camelContext> The following snippet is the Spring XML configuration to setup the error handlers in pure spring XML: <!-- camel policy we refer to in our route --> <bean id="required" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> <property name="template" ref="PROPAGATION_REQUIRED"/> </bean> <!-- the standard spring transaction template for required --> <bean id="PROPAGATION_REQUIRED" class="org.springframework.transaction.support.TransactionTemplate"> <property name="transactionManager" ref="jmsTransactionManager"/> </bean> <!-- the transaction error handle we refer to from the route --> <bean id="transactionErrorHandler" class="org.apache.camel.spring.spi.TransactionErrorHandlerBuilder"> <property name="transactionTemplate" ref="PROPAGATION_REQUIRED"/> <!-- here we refer to the configurations of the error handler --> <property name="redeliveryPolicy" ref="redeliveryPolicyConfig"/> </bean> <!-- configuration of the transaction error handler --> <bean id="redeliveryPolicyConfig" class="org.apache.camel.processor.RedeliveryPolicy"> <!-- try up till 5 times --> <property name="maximumRedeliveries" value="5"/> <!-- wait 5 seconds at first redelivery --> <property name="initialRedeliveryDelay" value="5000"/> <!-- increas the wait time for each redelivery --> <property name="useExponentialBackOff" value="true"/> <!-- wait at most 30 seconds between redelivery --> <property name="maximumRedeliveryDelay" value="30000"/> </bean> <!-- the no error handler --> <bean id="noErrorHandler" class="org.apache.camel.builder.NoErrorHandlerBuilder"/> See AlsoUsing This PatternIf you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out. |
