001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.pool;
018
019import java.io.IOException;
020import java.util.HashMap;
021import java.util.Properties;
022import javax.jms.Connection;
023import javax.jms.JMSException;
024import javax.jms.Session;
025import javax.jms.XAConnection;
026import javax.jms.XASession;
027import javax.naming.NamingException;
028import javax.naming.Reference;
029import javax.transaction.xa.XAResource;
030import org.apache.activemq.ActiveMQConnection;
031import org.apache.activemq.ActiveMQConnectionFactory;
032import org.apache.activemq.ActiveMQSession;
033import org.apache.activemq.ActiveMQXAConnectionFactory;
034import org.apache.activemq.Service;
035import org.apache.activemq.jms.pool.PooledSession;
036import org.apache.activemq.jms.pool.SessionKey;
037import org.apache.activemq.jms.pool.XaConnectionPool;
038import org.apache.activemq.jndi.JNDIReferenceFactory;
039import org.apache.activemq.jndi.JNDIStorableInterface;
040import org.apache.activemq.transport.TransportListener;
041import org.apache.activemq.util.IntrospectionSupport;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046  * Add Service and Referenceable and TransportListener to @link{org.apache.activemq.jms.pool.XaPooledConnectionFactory}
047  *
048  * @org.apache.xbean.XBean element=xaPooledConnectionFactory"
049  */
050public class XaPooledConnectionFactory extends org.apache.activemq.jms.pool.XaPooledConnectionFactory implements JNDIStorableInterface, Service {
051    public static final String POOL_PROPS_PREFIX = "pool";
052    private static final transient Logger LOG = LoggerFactory.getLogger(org.apache.activemq.jms.pool.XaPooledConnectionFactory.class);
053    private String brokerUrl;
054
055    public XaPooledConnectionFactory() {
056        super();
057    }
058
059    public XaPooledConnectionFactory(ActiveMQXAConnectionFactory connectionFactory) {
060        setConnectionFactory(connectionFactory);
061    }
062
063    @Override
064    protected org.apache.activemq.jms.pool.ConnectionPool createConnectionPool(Connection connection) {
065        return new XaConnectionPool(connection, getTransactionManager()) {
066
067            @Override
068            protected Session makeSession(SessionKey key) throws JMSException {
069                if (connection instanceof XAConnection) {
070                    return ((XAConnection)connection).createXASession();
071                } else {
072                    return connection.createSession(key.isTransacted(), key.getAckMode());
073                }
074            }
075
076            @Override
077            protected XAResource createXaResource(PooledSession session) throws JMSException {
078                if (session.getInternalSession() instanceof XASession) {
079                    return ((XASession)session.getInternalSession()).getXAResource();
080                } else {
081                    return ((ActiveMQSession)session.getInternalSession()).getTransactionContext();
082                }
083            }
084
085
086            @Override
087            protected Connection wrap(final Connection connection) {
088                // Add a transport Listener so that we can notice if this connection
089                // should be expired due to a connection failure.
090                ((ActiveMQConnection)connection).addTransportListener(new TransportListener() {
091                    @Override
092                    public void onCommand(Object command) {
093                    }
094
095                    @Override
096                    public void onException(IOException error) {
097                        synchronized (this) {
098                            setHasExpired(true);
099                            // only log if not stopped
100                            if (!stopped.get()) {
101                                LOG.info("Expiring connection " + connection + " on IOException: " + error.getMessage());
102                                // log stacktrace at debug level
103                                LOG.debug("Expiring connection " + connection + " on IOException: ", error);
104                            }
105                        }
106                    }
107
108                    @Override
109                    public void transportInterupted() {
110                    }
111
112                    @Override
113                    public void transportResumed() {
114                    }
115                });
116
117                // make sure that we set the hasFailed flag, in case the transport already failed
118                // prior to the addition of our new TransportListener
119                setHasExpired(((ActiveMQConnection) connection).isTransportFailed());
120
121                // may want to return an amq EnhancedConnection
122                return connection;
123            }
124
125            @Override
126            protected void unWrap(Connection connection) {
127                if (connection != null) {
128                    ((ActiveMQConnection)connection).cleanUpTempDestinations();
129                }
130            }
131        };
132    }
133
134    protected void buildFromProperties(Properties props) {
135        ActiveMQConnectionFactory activeMQConnectionFactory = props.containsKey("xaAckMode") ?
136                new ActiveMQXAConnectionFactory() : new ActiveMQConnectionFactory();
137        activeMQConnectionFactory.buildFromProperties(props);
138        setConnectionFactory(activeMQConnectionFactory);
139        IntrospectionSupport.setProperties(this, new HashMap(props), POOL_PROPS_PREFIX);
140    }
141
142    protected void populateProperties(Properties props) {
143        ((ActiveMQConnectionFactory)getConnectionFactory()).populateProperties(props);
144        IntrospectionSupport.getProperties(this, props, POOL_PROPS_PREFIX);
145    }
146
147    @Override
148    public void setProperties(Properties properties) {
149        buildFromProperties(properties);
150    }
151
152    @Override
153    public Properties getProperties() {
154        Properties properties = new Properties();
155        populateProperties(properties);
156        return properties;
157    }
158
159    @Override
160    public Reference getReference() throws NamingException {
161        return JNDIReferenceFactory.createReference(this.getClass().getName(), this);
162    }
163
164    public void setBrokerUrl(String url) {
165        if (brokerUrl == null || !brokerUrl.equals(url)) {
166            brokerUrl = url;
167            setConnectionFactory(new ActiveMQXAConnectionFactory(brokerUrl));
168        }
169    }
170
171    public String getBrokerUrl() {
172        return brokerUrl;
173    }
174}