001/*
002 * Copyright 2006 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.apache.activemq.pool;
017
018import java.io.IOException;
019import javax.jms.Connection;
020import org.apache.activemq.ActiveMQConnection;
021import org.apache.activemq.jms.pool.ConnectionPool;
022import org.apache.activemq.jms.pool.JcaConnectionPool;
023import org.apache.activemq.transport.TransportListener;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027public class JcaPooledConnectionFactory extends XaPooledConnectionFactory {
028    private static final transient Logger LOG = LoggerFactory.getLogger(JcaPooledConnectionFactory.class);
029
030    private String name;
031
032    public String getName() {
033        return name;
034    }
035
036    public void setName(String name) {
037        this.name = name;
038    }
039
040    protected ConnectionPool createConnectionPool(Connection connection) {
041        return new JcaConnectionPool(connection, getTransactionManager(), getName()) {
042
043            @Override
044            protected Connection wrap(final Connection connection) {
045                // Add a transport Listener so that we can notice if this connection
046                // should be expired due to a connection failure.
047                ((ActiveMQConnection)connection).addTransportListener(new TransportListener() {
048                    @Override
049                    public void onCommand(Object command) {
050                    }
051
052                    @Override
053                    public void onException(IOException error) {
054                        synchronized (this) {
055                            setHasExpired(true);
056                            // only log if not stopped
057                            if (!stopped.get()) {
058                                LOG.info("Expiring connection " + connection + " on IOException: " + error.getMessage());
059                                // log stacktrace at debug level
060                                LOG.debug("Expiring connection " + connection + " on IOException: ", error);
061                            }
062                        }
063                    }
064
065                    @Override
066                    public void transportInterupted() {
067                    }
068
069                    @Override
070                    public void transportResumed() {
071                    }
072                });
073
074                // make sure that we set the hasFailed flag, in case the transport already failed
075                // prior to the addition of our new TransportListener
076                setHasExpired(((ActiveMQConnection) connection).isTransportFailed());
077
078                // may want to return an amq EnhancedConnection
079                return connection;
080            }
081
082            @Override
083            protected void unWrap(Connection connection) {
084                if (connection != null) {
085                    ((ActiveMQConnection)connection).cleanUpTempDestinations();
086                }
087            }
088        };
089    }
090}