001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.pool;
018    
019    import java.io.IOException;
020    import java.util.HashMap;
021    import java.util.Properties;
022    import javax.jms.Connection;
023    import javax.naming.NamingException;
024    import javax.naming.Reference;
025    import org.apache.activemq.ActiveMQConnection;
026    import org.apache.activemq.ActiveMQConnectionFactory;
027    import org.apache.activemq.Service;
028    import org.apache.activemq.jms.pool.ConnectionPool;
029    import org.apache.activemq.jndi.JNDIReferenceFactory;
030    import org.apache.activemq.jndi.JNDIStorableInterface;
031    import org.apache.activemq.transport.TransportListener;
032    import org.apache.activemq.util.IntrospectionSupport;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     * Add Service and Referenceable and TransportListener to @link{org.apache.activemq.jms.pool.PooledConnectionFactory}
038     *
039     * @org.apache.xbean.XBean element="pooledConnectionFactory"
040     */
041    public class PooledConnectionFactory extends org.apache.activemq.jms.pool.PooledConnectionFactory implements JNDIStorableInterface, Service {
042        public static final String POOL_PROPS_PREFIX = "pool";
043    
044        private static final transient Logger LOG = LoggerFactory.getLogger(org.apache.activemq.jms.pool.PooledConnectionFactory.class);
045    
046        public PooledConnectionFactory() {
047            super();
048        }
049    
050        public PooledConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
051            setConnectionFactory(activeMQConnectionFactory);
052        }
053    
054        public PooledConnectionFactory(String brokerURL) {
055            setConnectionFactory(new ActiveMQConnectionFactory(brokerURL));
056        }
057    
058        protected void buildFromProperties(Properties props) {
059            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
060            activeMQConnectionFactory.buildFromProperties(props);
061            setConnectionFactory(activeMQConnectionFactory);
062            IntrospectionSupport.setProperties(this, new HashMap(props), POOL_PROPS_PREFIX);
063        }
064    
065        protected void populateProperties(Properties props) {
066            ((ActiveMQConnectionFactory)getConnectionFactory()).populateProperties(props);
067            IntrospectionSupport.getProperties(this, props, POOL_PROPS_PREFIX);
068        }
069    
070        @Override
071        public void setProperties(Properties properties) {
072            buildFromProperties(properties);
073        }
074    
075        @Override
076        public Properties getProperties() {
077            Properties properties = new Properties();
078            populateProperties(properties);
079            return properties;
080        }
081    
082    
083        @Override
084        public Reference getReference() throws NamingException {
085            return JNDIReferenceFactory.createReference(this.getClass().getName(), this);
086        }
087    
088        @Override
089        protected Connection newPooledConnection(ConnectionPool connection) {
090            return new PooledConnection(connection);
091        }
092    
093        @Override
094        protected org.apache.activemq.jms.pool.ConnectionPool createConnectionPool(Connection connection) {
095            return new ConnectionPool(connection) {
096    
097                @Override
098                protected Connection wrap(final Connection connection) {
099                    // Add a transport Listener so that we can notice if this connection
100                    // should be expired due to a connection failure.
101                    ((ActiveMQConnection)connection).addTransportListener(new TransportListener() {
102                        @Override
103                        public void onCommand(Object command) {
104                        }
105    
106                        @Override
107                        public void onException(IOException error) {
108                            synchronized (this) {
109                                setHasExpired(true);
110                                LOG.info("Expiring connection {} on IOException: {}" , connection, error);
111                                LOG.debug("Expiring connection on IOException", error);
112                            }
113                        }
114    
115                        @Override
116                        public void transportInterupted() {
117                        }
118    
119                        @Override
120                        public void transportResumed() {
121                        }
122                    });
123    
124                    // make sure that we set the hasFailed flag, in case the transport already failed
125                    // prior to the addition of our new TransportListener
126                    setHasExpired(((ActiveMQConnection)connection).isTransportFailed());
127    
128                    // may want to return an amq EnhancedConnection
129                    return connection;
130                }
131    
132                @Override
133                protected void unWrap(Connection connection) {
134                    if (connection != null) {
135                        ((ActiveMQConnection)connection).cleanUpTempDestinations();
136                    }
137                }
138            };
139        }
140    
141    }