001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.pool;
018    
019    import java.io.IOException;
020    
021    import javax.jms.ConnectionFactory;
022    import javax.jms.Session;
023    import javax.jms.JMSException;
024    import javax.transaction.SystemException;
025    import javax.transaction.TransactionManager;
026    
027    import javax.transaction.xa.XAResource;
028    import org.apache.geronimo.transaction.manager.NamedXAResourceFactory;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    import org.apache.activemq.ActiveMQConnectionFactory;
032    import org.apache.activemq.ActiveMQConnection;
033    import org.apache.activemq.ActiveMQSession;
034    import org.apache.activemq.util.IOExceptionSupport;
035    import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
036    import org.apache.geronimo.transaction.manager.NamedXAResource;
037    import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
038    
039    
040    /**
041     * This class allows wiring the ActiveMQ broker and the Geronimo transaction manager
042     * in a way that will allow the transaction manager to correctly recover XA transactions.
043     *
044     * For example, it can be used the following way:
045     * <pre>
046     *   <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
047     *      <property name="brokerURL" value="tcp://localhost:61616" />
048     *   </bean>
049     *
050     *   <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean">
051     *       <property name="maxConnections" value="8" />
052     *       <property name="transactionManager" ref="transactionManager" />
053     *       <property name="connectionFactory" ref="activemqConnectionFactory" />
054     *       <property name="resourceName" value="activemq.broker" />
055     *   </bean>
056     *
057     *   <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
058     *         <property name="transactionManager" ref="transactionManager" />
059     *         <property name="connectionFactory" ref="activemqConnectionFactory" />
060     *         <property name="resourceName" value="activemq.broker" />
061     *   </bean>
062     * </pre>
063     */
064    public class ActiveMQResourceManager {
065    
066        private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQResourceManager.class);
067    
068        private String resourceName;
069    
070        private TransactionManager transactionManager;
071    
072        private ConnectionFactory connectionFactory;
073    
074        public void recoverResource() {
075            try {
076                if (!Recovery.recover(this)) {
077                    LOGGER.info("Resource manager is unrecoverable");
078                }
079            } catch (NoClassDefFoundError e) {
080                LOGGER.info("Resource manager is unrecoverable due to missing classes: " + e);
081            } catch (Throwable e) {
082                LOGGER.warn("Error while recovering resource manager", e);
083            }
084        }
085    
086        public String getResourceName() {
087            return resourceName;
088        }
089    
090        public void setResourceName(String resourceName) {
091            this.resourceName = resourceName;
092        }
093    
094        public TransactionManager getTransactionManager() {
095            return transactionManager;
096        }
097    
098        public void setTransactionManager(TransactionManager transactionManager) {
099            this.transactionManager = transactionManager;
100        }
101    
102        public ConnectionFactory getConnectionFactory() {
103            return connectionFactory;
104        }
105    
106        public void setConnectionFactory(ConnectionFactory connectionFactory) {
107            this.connectionFactory = connectionFactory;
108        }
109    
110        /**
111         * This class will ensure the broker is properly recovered when wired with
112         * the Geronimo transaction manager.
113         */
114        public static class Recovery {
115    
116            public static boolean isRecoverable(ActiveMQResourceManager rm) {
117                return  rm.getConnectionFactory() instanceof ActiveMQConnectionFactory &&
118                        rm.getTransactionManager() instanceof RecoverableTransactionManager &&
119                        rm.getResourceName() != null && !"".equals(rm.getResourceName());
120            }
121    
122            public static boolean recover(final ActiveMQResourceManager rm) throws IOException {
123                if (isRecoverable(rm)) {
124                    try {
125                        final ActiveMQConnectionFactory connFactory = (ActiveMQConnectionFactory) rm.getConnectionFactory();
126                        ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
127                        final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
128                        NamedXAResource namedXaResource = new WrapperNamedXAResource(session.getTransactionContext(), rm.getResourceName());
129    
130                        RecoverableTransactionManager rtxManager = (RecoverableTransactionManager) rm.getTransactionManager();
131                        rtxManager.registerNamedXAResourceFactory(new NamedXAResourceFactory() {
132    
133                            @Override
134                            public String getName() {
135                                return rm.getResourceName();
136                            }
137    
138                            @Override
139                            public NamedXAResource getNamedXAResource() throws SystemException {
140                                try {
141                                    final ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
142                                    final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
143                                    activeConn.start();
144                                    LOGGER.debug("new namedXAResource's connection: " + activeConn);
145    
146                                    return new ConnectionAndWrapperNamedXAResource(session.getTransactionContext(), getName(), activeConn);
147                                } catch (Exception e) {
148                                    SystemException se =  new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage());
149                                    se.initCause(e);
150                                    LOGGER.error(se.getLocalizedMessage(), se);
151                                    throw se;
152                                }
153                            }
154    
155                            @Override
156                            public void returnNamedXAResource(NamedXAResource namedXaResource) {
157                                if (namedXaResource instanceof ConnectionAndWrapperNamedXAResource) {
158                                    try {
159                                        LOGGER.debug("closing returned namedXAResource's connection: " + ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection);
160                                        ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection.close();
161                                    } catch (Exception ignored) {
162                                        LOGGER.debug("failed to close returned namedXAResource: " + namedXaResource, ignored);
163                                    }
164                                }
165                            }
166                        });
167                        return true;
168                    } catch (JMSException e) {
169                      throw IOExceptionSupport.create(e);
170                    }
171                } else {
172                    return false;
173                }
174            }
175        }
176    
177        public static class ConnectionAndWrapperNamedXAResource extends WrapperNamedXAResource {
178            final ActiveMQConnection connection;
179            public ConnectionAndWrapperNamedXAResource(XAResource xaResource, String name, ActiveMQConnection connection) {
180                super(xaResource, name);
181                this.connection = connection;
182            }
183        }
184    }