001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.pool;
018    
019    import java.util.concurrent.atomic.AtomicBoolean;
020    
021    import javax.jms.Connection;
022    import javax.jms.ConnectionFactory;
023    import javax.jms.JMSException;
024    
025    import org.apache.activemq.ActiveMQConnection;
026    import org.apache.activemq.ActiveMQConnectionFactory;
027    import org.apache.activemq.Service;
028    import org.apache.activemq.util.JMSExceptionSupport;
029    import org.apache.commons.pool.KeyedObjectPool;
030    import org.apache.commons.pool.KeyedPoolableObjectFactory;
031    import org.apache.commons.pool.ObjectPoolFactory;
032    import org.apache.commons.pool.impl.GenericKeyedObjectPool;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     * A JMS provider which pools Connection, Session and MessageProducer instances
038     * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's
039     * <a href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
040     * Connections, sessions and producers are returned to a pool after use so that they can be reused later
041     * without having to undergo the cost of creating them again.
042     *
043     * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
044     * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
045     * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
046     * just created at startup and left active, handling incoming messages as they come. When a consumer is
047     * complete, it is best to close it rather than return it to a pool for later reuse: this is because,
048     * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
049     * where they'll get held until the consumer is active again.
050     *
051     * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
052     * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
053     * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
054     * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
055     *
056     * Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the
057     * pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should
058     * be used when configuring this optional feature. Eviction runs contend with client threads for access
059     * to objects in the pool, so if they run too frequently performance issues may result. The idle object
060     * eviction thread may be configured using the {@link setTimeBetweenExpirationCheckMillis} method.  By
061     * default the value is -1 which means no eviction thread will be run.  Set to a non-negative value to
062     * configure the idle eviction thread to run.
063     *
064     * @org.apache.xbean.XBean element="pooledConnectionFactory"
065     */
066    public class PooledConnectionFactory implements ConnectionFactory, Service {
067        private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
068    
069        private final AtomicBoolean stopped = new AtomicBoolean(false);
070        private final GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;
071    
072        private ConnectionFactory connectionFactory;
073    
074        private int maximumActiveSessionPerConnection = 500;
075        private int idleTimeout = 30 * 1000;
076        private boolean blockIfSessionPoolIsFull = true;
077        private long expiryTimeout = 0l;
078        private boolean createConnectionOnStartup = true;
079    
080        /**
081         * Creates new PooledConnectionFactory with a default ActiveMQConnectionFactory instance.
082         * <p/>
083         * The URI used to connect to ActiveMQ comes from the default value of ActiveMQConnectionFactory.
084         */
085        public PooledConnectionFactory() {
086            this(new ActiveMQConnectionFactory());
087        }
088    
089        /**
090         * Creates a new PooledConnectionFactory that will use the given broker URI to connect to
091         * ActiveMQ.
092         *
093         * @param brokerURL
094         *      The URI to use to configure the internal ActiveMQConnectionFactory.
095         */
096        public PooledConnectionFactory(String brokerURL) {
097            this(new ActiveMQConnectionFactory(brokerURL));
098        }
099    
100        /**
101         * Creates a new PooledConnectionFactory that will use the given ActiveMQConnectionFactory to
102         * create new ActiveMQConnection instances that will be pooled.
103         *
104         * @param connectionFactory
105         *      The ActiveMQConnectionFactory to create new Connections for this pool.
106         */
107        public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
108            this.connectionFactory = connectionFactory;
109    
110            this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>(
111                new KeyedPoolableObjectFactory<ConnectionKey, ConnectionPool>() {
112    
113                    @Override
114                    public void activateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
115                    }
116    
117                    @Override
118                    public void destroyObject(ConnectionKey key, ConnectionPool connection) throws Exception {
119                        try {
120                            if (LOG.isTraceEnabled()) {
121                                LOG.trace("Destroying connection: {}", connection);
122                            }
123                            connection.close();
124                        } catch (Exception e) {
125                            LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
126                        }
127                    }
128    
129                    @Override
130                    public ConnectionPool makeObject(ConnectionKey key) throws Exception {
131                        ActiveMQConnection delegate = createConnection(key);
132    
133                        ConnectionPool connection = createConnectionPool(delegate);
134                        connection.setIdleTimeout(getIdleTimeout());
135                        connection.setExpiryTimeout(getExpiryTimeout());
136                        connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
137                        connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
138    
139                        if (LOG.isTraceEnabled()) {
140                            LOG.trace("Created new connection: {}", connection);
141                        }
142    
143                        return connection;
144                    }
145    
146                    @Override
147                    public void passivateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
148                    }
149    
150                    @Override
151                    public boolean validateObject(ConnectionKey key, ConnectionPool connection) {
152                        if (connection != null && connection.expiredCheck()) {
153                            if (LOG.isTraceEnabled()) {
154                                LOG.trace("Connection has expired: {} and will be destroyed", connection);
155                            }
156    
157                            return false;
158                        }
159    
160                        return true;
161                    }
162            });
163    
164            // Set max idle (not max active) since our connections always idle in the pool.
165            this.connectionsPool.setMaxIdle(1);
166    
167            // We always want our validate method to control when idle objects are evicted.
168            this.connectionsPool.setTestOnBorrow(true);
169            this.connectionsPool.setTestWhileIdle(true);
170        }
171    
172        /**
173         * @return the currently configured ConnectionFactory used to create the pooled Connections.
174         */
175        public ConnectionFactory getConnectionFactory() {
176            return connectionFactory;
177        }
178    
179        /**
180         * Sets the ConnectionFactory used to create new pooled Connections.
181         * <p/>
182         * Updates to this value do not affect Connections that were previously created and placed
183         * into the pool.  In order to allocate new Connections based off this new ConnectionFactory
184         * it is first necessary to {@link clear} the pooled Connections.
185         *
186         * @param connectionFactory
187         *      The factory to use to create pooled Connections.
188         */
189        public void setConnectionFactory(ConnectionFactory connectionFactory) {
190            this.connectionFactory = connectionFactory;
191        }
192    
193        @Override
194        public Connection createConnection() throws JMSException {
195            return createConnection(null, null);
196        }
197    
198        @Override
199        public synchronized Connection createConnection(String userName, String password) throws JMSException {
200            if (stopped.get()) {
201                LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
202                return null;
203            }
204    
205            ConnectionPool connection = null;
206            ConnectionKey key = new ConnectionKey(userName, password);
207    
208            // This will either return an existing non-expired ConnectionPool or it
209            // will create a new one to meet the demand.
210            if (connectionsPool.getNumIdle(key) < getMaxConnections()) {
211                try {
212                    // we want borrowObject to return the one we added.
213                    connectionsPool.setLifo(true);
214                    connectionsPool.addObject(key);
215                } catch (Exception e) {
216                    throw JMSExceptionSupport.create("Error while attempting to add new Connection to the pool", e);
217                }
218            } else {
219                // now we want the oldest one in the pool.
220                connectionsPool.setLifo(false);
221            }
222    
223            try {
224                connection = connectionsPool.borrowObject(key);
225            } catch (Exception e) {
226                throw JMSExceptionSupport.create("Error while attempting to retrieve a connection from the pool", e);
227            }
228    
229            try {
230                connectionsPool.returnObject(key, connection);
231            } catch (Exception e) {
232                throw JMSExceptionSupport.create("Error when returning connection to the pool", e);
233            }
234    
235            return new PooledConnection(connection);
236        }
237    
238        /**
239         * @deprecated
240         */
241        public ObjectPoolFactory<?> getPoolFactory() {
242            return null;
243        }
244    
245        protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
246            if (key.getUserName() == null && key.getPassword() == null) {
247                return (ActiveMQConnection)connectionFactory.createConnection();
248            } else {
249                return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword());
250            }
251        }
252    
253        @Override
254        public void start() {
255            LOG.debug("Staring the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup());
256            stopped.set(false);
257            if (isCreateConnectionOnStartup()) {
258                try {
259                    // warm the pool by creating a connection during startup
260                    createConnection();
261                } catch (JMSException e) {
262                    LOG.warn("Create pooled connection during start failed. This exception will be ignored.", e);
263                }
264            }
265        }
266    
267        @Override
268        public void stop() {
269            LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
270                      connectionsPool.getNumActive());
271    
272            if (stopped.compareAndSet(false, true)) {
273                try {
274                    connectionsPool.close();
275                } catch (Exception e) {
276                }
277            }
278        }
279    
280        /**
281         * Clears all connections from the pool.  Each connection that is currently in the pool is
282         * closed and removed from the pool.  A new connection will be created on the next call to
283         * {@link createConnection}.  Care should be taken when using this method as Connections that
284         * are in use be client's will be closed.
285         */
286        public void clear() {
287    
288            if (stopped.get()) {
289                return;
290            }
291    
292            this.connectionsPool.clear();
293        }
294    
295        /**
296         * @deprecated use {@link #getMaximumActiveSessionPerConnection()}
297         */
298        @Deprecated
299        public int getMaximumActive() {
300            return getMaximumActiveSessionPerConnection();
301        }
302    
303        /**
304         * @deprecated use {@link #setMaximumActiveSessionPerConnection(int)}
305         */
306        @Deprecated
307        public void setMaximumActive(int maximumActive) {
308            setMaximumActiveSessionPerConnection(maximumActive);
309        }
310    
311        /**
312         * Returns the currently configured maximum number of sessions a pooled Connection will
313         * create before it either blocks or throws an exception when a new session is requested,
314         * depending on configuration.
315         *
316         * @return the number of session instances that can be taken from a pooled connection.
317         */
318        public int getMaximumActiveSessionPerConnection() {
319            return maximumActiveSessionPerConnection;
320        }
321    
322        /**
323         * Sets the maximum number of active sessions per connection
324         *
325         * @param maximumActiveSessionPerConnection
326         *      The maximum number of active session per connection in the pool.
327         */
328        public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
329            this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
330        }
331    
332        /**
333         * Controls the behavior of the internal session pool. By default the call to
334         * Connection.getSession() will block if the session pool is full.  If the
335         * argument false is given, it will change the default behavior and instead the
336         * call to getSession() will throw a JMSException.
337         *
338         * The size of the session pool is controlled by the @see #maximumActive
339         * property.
340         *
341         * @param block - if true, the call to getSession() blocks if the pool is full
342         * until a session object is available.  defaults to true.
343         */
344        public void setBlockIfSessionPoolIsFull(boolean block) {
345            this.blockIfSessionPoolIsFull = block;
346        }
347    
348        /**
349         * Returns whether a pooled Connection will enter a blocked state or will throw an Exception
350         * once the maximum number of sessions has been borrowed from the the Session Pool.
351         *
352         * @return true if the pooled Connection createSession method will block when the limit is hit.
353         * @see setBlockIfSessionPoolIsFull
354         */
355        public boolean isBlockIfSessionPoolIsFull() {
356            return this.blockIfSessionPoolIsFull;
357        }
358    
359        /**
360         * Returns the maximum number to pooled Connections that this factory will allow before it
361         * begins to return connections from the pool on calls to ({@link createConnection}.
362         *
363         * @return the maxConnections that will be created for this pool.
364         */
365        public int getMaxConnections() {
366            return connectionsPool.getMaxIdle();
367        }
368    
369        /**
370         * Sets the maximum number of pooled Connections (defaults to one).  Each call to
371         * {@link createConnection} will result in a new Connection being create up to the max
372         * connections value.
373         *
374         * @param maxConnections the maxConnections to set
375         */
376        public void setMaxConnections(int maxConnections) {
377            this.connectionsPool.setMaxIdle(maxConnections);
378        }
379    
380        /**
381         * Gets the Idle timeout value applied to new Connection's that are created by this pool.
382         * <p/>
383         * The idle timeout is used determine if a Connection instance has sat to long in the pool unused
384         * and if so is closed and removed from the pool.  The default value is 30 seconds.
385         *
386         * @return
387         */
388        public int getIdleTimeout() {
389            return idleTimeout;
390        }
391    
392        /**
393         * Sets the idle timeout value for Connection's that are created by this pool, defaults to 30 seconds.
394         * <p/>
395         * For a Connection that is in the pool but has no current users the idle timeout determines how
396         * long the Connection can live before it is eligible for removal from the pool.  Normally the
397         * connections are tested when an attempt to check one out occurs so a Connection instance can sit
398         * in the pool much longer than its idle timeout if connections are used infrequently.
399         *
400         *
401         * @param idleTimeout
402         *      The maximum time a pooled Connection can sit unused before it is eligible for removal.
403         */
404        public void setIdleTimeout(int idleTimeout) {
405            this.idleTimeout = idleTimeout;
406        }
407    
408        /**
409         * allow connections to expire, irrespective of load or idle time. This is useful with failover
410         * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
411         *
412         * @param expiryTimeout non zero in milliseconds
413         */
414        public void setExpiryTimeout(long expiryTimeout) {
415            this.expiryTimeout = expiryTimeout;
416        }
417    
418        /**
419         * @return the configured expiration timeout for connections in the pool.
420         */
421        public long getExpiryTimeout() {
422            return expiryTimeout;
423        }
424    
425        /**
426         * @return true if a Connection is created immediately on a call to {@link start}.
427         */
428        public boolean isCreateConnectionOnStartup() {
429            return createConnectionOnStartup;
430        }
431    
432        /**
433         * Whether to create a connection on starting this {@link PooledConnectionFactory}.
434         * <p/>
435         * This can be used to warm-up the pool on startup. Notice that any kind of exception
436         * happens during startup is logged at WARN level and ignored.
437         *
438         * @param createConnectionOnStartup <tt>true</tt> to create a connection on startup
439         */
440        public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
441            this.createConnectionOnStartup = createConnectionOnStartup;
442        }
443    
444        /**
445         * Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys.
446         *
447         * @return this factories pool of ConnectionPool instances.
448         */
449        KeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() {
450            return this.connectionsPool;
451        }
452    
453        /**
454         * Sets the number of milliseconds to sleep between runs of the idle Connection eviction thread.
455         * When non-positive, no idle object eviction thread will be run, and Connections will only be
456         * checked on borrow to determine if they have sat idle for too long or have failed for some
457         * other reason.
458         * <p/>
459         * By default this value is set to -1 and no expiration thread ever runs.
460         *
461         * @param timeBetweenExpirationCheckMillis
462         *      The time to wait between runs of the idle Connection eviction thread.
463         */
464        public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) {
465            this.connectionsPool.setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis);
466        }
467    
468        /**
469         * @return the number of milliseconds to sleep between runs of the idle connection eviction thread.
470         */
471        public long setTimeBetweenExpirationCheckMillis() {
472            return this.connectionsPool.getTimeBetweenEvictionRunsMillis();
473        }
474    
475        /**
476         * @return the number of Connections currently in the Pool
477         */
478        public int getNumConnections() {
479            return this.connectionsPool.getNumIdle();
480        }
481    
482        /**
483         * @deprecated
484         */
485        public void setPoolFactory(ObjectPoolFactory<?> factory) {
486        }
487    
488        /**
489         * Delegate that creates each instance of an ConnectionPool object.  Subclasses can override
490         * this method to customize the type of connection pool returned.
491         *
492         * @param connection
493         *
494         * @return instance of a new ConnectionPool.
495         */
496        protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
497            return new ConnectionPool(connection);
498        }
499    }