001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.pool;
018
019 import java.util.concurrent.atomic.AtomicBoolean;
020
021 import javax.jms.Connection;
022 import javax.jms.ConnectionFactory;
023 import javax.jms.JMSException;
024
025 import org.apache.activemq.ActiveMQConnection;
026 import org.apache.activemq.ActiveMQConnectionFactory;
027 import org.apache.activemq.Service;
028 import org.apache.activemq.util.JMSExceptionSupport;
029 import org.apache.commons.pool.KeyedObjectPool;
030 import org.apache.commons.pool.KeyedPoolableObjectFactory;
031 import org.apache.commons.pool.ObjectPoolFactory;
032 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
033 import org.slf4j.Logger;
034 import org.slf4j.LoggerFactory;
035
036 /**
037 * A JMS provider which pools Connection, Session and MessageProducer instances
038 * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's
039 * <a href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
040 * Connections, sessions and producers are returned to a pool after use so that they can be reused later
041 * without having to undergo the cost of creating them again.
042 *
043 * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
044 * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
045 * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
046 * just created at startup and left active, handling incoming messages as they come. When a consumer is
047 * complete, it is best to close it rather than return it to a pool for later reuse: this is because,
048 * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
049 * where they'll get held until the consumer is active again.
050 *
051 * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
052 * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
053 * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
054 * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
055 *
056 * Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the
057 * pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should
058 * be used when configuring this optional feature. Eviction runs contend with client threads for access
059 * to objects in the pool, so if they run too frequently performance issues may result. The idle object
060 * eviction thread may be configured using the {@link setTimeBetweenExpirationCheckMillis} method. By
061 * default the value is -1 which means no eviction thread will be run. Set to a non-negative value to
062 * configure the idle eviction thread to run.
063 *
064 * @org.apache.xbean.XBean element="pooledConnectionFactory"
065 */
066 public class PooledConnectionFactory implements ConnectionFactory, Service {
067 private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
068
069 private final AtomicBoolean stopped = new AtomicBoolean(false);
070 private final GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;
071
072 private ConnectionFactory connectionFactory;
073
074 private int maximumActiveSessionPerConnection = 500;
075 private int idleTimeout = 30 * 1000;
076 private boolean blockIfSessionPoolIsFull = true;
077 private long expiryTimeout = 0l;
078 private boolean createConnectionOnStartup = true;
079
080 /**
081 * Creates new PooledConnectionFactory with a default ActiveMQConnectionFactory instance.
082 * <p/>
083 * The URI used to connect to ActiveMQ comes from the default value of ActiveMQConnectionFactory.
084 */
085 public PooledConnectionFactory() {
086 this(new ActiveMQConnectionFactory());
087 }
088
089 /**
090 * Creates a new PooledConnectionFactory that will use the given broker URI to connect to
091 * ActiveMQ.
092 *
093 * @param brokerURL
094 * The URI to use to configure the internal ActiveMQConnectionFactory.
095 */
096 public PooledConnectionFactory(String brokerURL) {
097 this(new ActiveMQConnectionFactory(brokerURL));
098 }
099
100 /**
101 * Creates a new PooledConnectionFactory that will use the given ActiveMQConnectionFactory to
102 * create new ActiveMQConnection instances that will be pooled.
103 *
104 * @param connectionFactory
105 * The ActiveMQConnectionFactory to create new Connections for this pool.
106 */
107 public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
108 this.connectionFactory = connectionFactory;
109
110 this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>(
111 new KeyedPoolableObjectFactory<ConnectionKey, ConnectionPool>() {
112
113 @Override
114 public void activateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
115 }
116
117 @Override
118 public void destroyObject(ConnectionKey key, ConnectionPool connection) throws Exception {
119 try {
120 if (LOG.isTraceEnabled()) {
121 LOG.trace("Destroying connection: {}", connection);
122 }
123 connection.close();
124 } catch (Exception e) {
125 LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
126 }
127 }
128
129 @Override
130 public ConnectionPool makeObject(ConnectionKey key) throws Exception {
131 ActiveMQConnection delegate = createConnection(key);
132
133 ConnectionPool connection = createConnectionPool(delegate);
134 connection.setIdleTimeout(getIdleTimeout());
135 connection.setExpiryTimeout(getExpiryTimeout());
136 connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
137 connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
138
139 if (LOG.isTraceEnabled()) {
140 LOG.trace("Created new connection: {}", connection);
141 }
142
143 return connection;
144 }
145
146 @Override
147 public void passivateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
148 }
149
150 @Override
151 public boolean validateObject(ConnectionKey key, ConnectionPool connection) {
152 if (connection != null && connection.expiredCheck()) {
153 if (LOG.isTraceEnabled()) {
154 LOG.trace("Connection has expired: {} and will be destroyed", connection);
155 }
156
157 return false;
158 }
159
160 return true;
161 }
162 });
163
164 // Set max idle (not max active) since our connections always idle in the pool.
165 this.connectionsPool.setMaxIdle(1);
166
167 // We always want our validate method to control when idle objects are evicted.
168 this.connectionsPool.setTestOnBorrow(true);
169 this.connectionsPool.setTestWhileIdle(true);
170 }
171
172 /**
173 * @return the currently configured ConnectionFactory used to create the pooled Connections.
174 */
175 public ConnectionFactory getConnectionFactory() {
176 return connectionFactory;
177 }
178
179 /**
180 * Sets the ConnectionFactory used to create new pooled Connections.
181 * <p/>
182 * Updates to this value do not affect Connections that were previously created and placed
183 * into the pool. In order to allocate new Connections based off this new ConnectionFactory
184 * it is first necessary to {@link clear} the pooled Connections.
185 *
186 * @param connectionFactory
187 * The factory to use to create pooled Connections.
188 */
189 public void setConnectionFactory(ConnectionFactory connectionFactory) {
190 this.connectionFactory = connectionFactory;
191 }
192
193 @Override
194 public Connection createConnection() throws JMSException {
195 return createConnection(null, null);
196 }
197
198 @Override
199 public synchronized Connection createConnection(String userName, String password) throws JMSException {
200 if (stopped.get()) {
201 LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
202 return null;
203 }
204
205 ConnectionPool connection = null;
206 ConnectionKey key = new ConnectionKey(userName, password);
207
208 // This will either return an existing non-expired ConnectionPool or it
209 // will create a new one to meet the demand.
210 if (connectionsPool.getNumIdle(key) < getMaxConnections()) {
211 try {
212 // we want borrowObject to return the one we added.
213 connectionsPool.setLifo(true);
214 connectionsPool.addObject(key);
215 } catch (Exception e) {
216 throw JMSExceptionSupport.create("Error while attempting to add new Connection to the pool", e);
217 }
218 } else {
219 // now we want the oldest one in the pool.
220 connectionsPool.setLifo(false);
221 }
222
223 try {
224 connection = connectionsPool.borrowObject(key);
225 } catch (Exception e) {
226 throw JMSExceptionSupport.create("Error while attempting to retrieve a connection from the pool", e);
227 }
228
229 try {
230 connectionsPool.returnObject(key, connection);
231 } catch (Exception e) {
232 throw JMSExceptionSupport.create("Error when returning connection to the pool", e);
233 }
234
235 return new PooledConnection(connection);
236 }
237
238 /**
239 * @deprecated
240 */
241 public ObjectPoolFactory<?> getPoolFactory() {
242 return null;
243 }
244
245 protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
246 if (key.getUserName() == null && key.getPassword() == null) {
247 return (ActiveMQConnection)connectionFactory.createConnection();
248 } else {
249 return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword());
250 }
251 }
252
253 @Override
254 public void start() {
255 LOG.debug("Staring the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup());
256 stopped.set(false);
257 if (isCreateConnectionOnStartup()) {
258 try {
259 // warm the pool by creating a connection during startup
260 createConnection();
261 } catch (JMSException e) {
262 LOG.warn("Create pooled connection during start failed. This exception will be ignored.", e);
263 }
264 }
265 }
266
267 @Override
268 public void stop() {
269 LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
270 connectionsPool.getNumActive());
271
272 if (stopped.compareAndSet(false, true)) {
273 try {
274 connectionsPool.close();
275 } catch (Exception e) {
276 }
277 }
278 }
279
280 /**
281 * Clears all connections from the pool. Each connection that is currently in the pool is
282 * closed and removed from the pool. A new connection will be created on the next call to
283 * {@link createConnection}. Care should be taken when using this method as Connections that
284 * are in use be client's will be closed.
285 */
286 public void clear() {
287
288 if (stopped.get()) {
289 return;
290 }
291
292 this.connectionsPool.clear();
293 }
294
295 /**
296 * @deprecated use {@link #getMaximumActiveSessionPerConnection()}
297 */
298 @Deprecated
299 public int getMaximumActive() {
300 return getMaximumActiveSessionPerConnection();
301 }
302
303 /**
304 * @deprecated use {@link #setMaximumActiveSessionPerConnection(int)}
305 */
306 @Deprecated
307 public void setMaximumActive(int maximumActive) {
308 setMaximumActiveSessionPerConnection(maximumActive);
309 }
310
311 /**
312 * Returns the currently configured maximum number of sessions a pooled Connection will
313 * create before it either blocks or throws an exception when a new session is requested,
314 * depending on configuration.
315 *
316 * @return the number of session instances that can be taken from a pooled connection.
317 */
318 public int getMaximumActiveSessionPerConnection() {
319 return maximumActiveSessionPerConnection;
320 }
321
322 /**
323 * Sets the maximum number of active sessions per connection
324 *
325 * @param maximumActiveSessionPerConnection
326 * The maximum number of active session per connection in the pool.
327 */
328 public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
329 this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
330 }
331
332 /**
333 * Controls the behavior of the internal session pool. By default the call to
334 * Connection.getSession() will block if the session pool is full. If the
335 * argument false is given, it will change the default behavior and instead the
336 * call to getSession() will throw a JMSException.
337 *
338 * The size of the session pool is controlled by the @see #maximumActive
339 * property.
340 *
341 * @param block - if true, the call to getSession() blocks if the pool is full
342 * until a session object is available. defaults to true.
343 */
344 public void setBlockIfSessionPoolIsFull(boolean block) {
345 this.blockIfSessionPoolIsFull = block;
346 }
347
348 /**
349 * Returns whether a pooled Connection will enter a blocked state or will throw an Exception
350 * once the maximum number of sessions has been borrowed from the the Session Pool.
351 *
352 * @return true if the pooled Connection createSession method will block when the limit is hit.
353 * @see setBlockIfSessionPoolIsFull
354 */
355 public boolean isBlockIfSessionPoolIsFull() {
356 return this.blockIfSessionPoolIsFull;
357 }
358
359 /**
360 * Returns the maximum number to pooled Connections that this factory will allow before it
361 * begins to return connections from the pool on calls to ({@link createConnection}.
362 *
363 * @return the maxConnections that will be created for this pool.
364 */
365 public int getMaxConnections() {
366 return connectionsPool.getMaxIdle();
367 }
368
369 /**
370 * Sets the maximum number of pooled Connections (defaults to one). Each call to
371 * {@link createConnection} will result in a new Connection being create up to the max
372 * connections value.
373 *
374 * @param maxConnections the maxConnections to set
375 */
376 public void setMaxConnections(int maxConnections) {
377 this.connectionsPool.setMaxIdle(maxConnections);
378 }
379
380 /**
381 * Gets the Idle timeout value applied to new Connection's that are created by this pool.
382 * <p/>
383 * The idle timeout is used determine if a Connection instance has sat to long in the pool unused
384 * and if so is closed and removed from the pool. The default value is 30 seconds.
385 *
386 * @return
387 */
388 public int getIdleTimeout() {
389 return idleTimeout;
390 }
391
392 /**
393 * Sets the idle timeout value for Connection's that are created by this pool, defaults to 30 seconds.
394 * <p/>
395 * For a Connection that is in the pool but has no current users the idle timeout determines how
396 * long the Connection can live before it is eligible for removal from the pool. Normally the
397 * connections are tested when an attempt to check one out occurs so a Connection instance can sit
398 * in the pool much longer than its idle timeout if connections are used infrequently.
399 *
400 *
401 * @param idleTimeout
402 * The maximum time a pooled Connection can sit unused before it is eligible for removal.
403 */
404 public void setIdleTimeout(int idleTimeout) {
405 this.idleTimeout = idleTimeout;
406 }
407
408 /**
409 * allow connections to expire, irrespective of load or idle time. This is useful with failover
410 * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
411 *
412 * @param expiryTimeout non zero in milliseconds
413 */
414 public void setExpiryTimeout(long expiryTimeout) {
415 this.expiryTimeout = expiryTimeout;
416 }
417
418 /**
419 * @return the configured expiration timeout for connections in the pool.
420 */
421 public long getExpiryTimeout() {
422 return expiryTimeout;
423 }
424
425 /**
426 * @return true if a Connection is created immediately on a call to {@link start}.
427 */
428 public boolean isCreateConnectionOnStartup() {
429 return createConnectionOnStartup;
430 }
431
432 /**
433 * Whether to create a connection on starting this {@link PooledConnectionFactory}.
434 * <p/>
435 * This can be used to warm-up the pool on startup. Notice that any kind of exception
436 * happens during startup is logged at WARN level and ignored.
437 *
438 * @param createConnectionOnStartup <tt>true</tt> to create a connection on startup
439 */
440 public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
441 this.createConnectionOnStartup = createConnectionOnStartup;
442 }
443
444 /**
445 * Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys.
446 *
447 * @return this factories pool of ConnectionPool instances.
448 */
449 KeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() {
450 return this.connectionsPool;
451 }
452
453 /**
454 * Sets the number of milliseconds to sleep between runs of the idle Connection eviction thread.
455 * When non-positive, no idle object eviction thread will be run, and Connections will only be
456 * checked on borrow to determine if they have sat idle for too long or have failed for some
457 * other reason.
458 * <p/>
459 * By default this value is set to -1 and no expiration thread ever runs.
460 *
461 * @param timeBetweenExpirationCheckMillis
462 * The time to wait between runs of the idle Connection eviction thread.
463 */
464 public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) {
465 this.connectionsPool.setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis);
466 }
467
468 /**
469 * @return the number of milliseconds to sleep between runs of the idle connection eviction thread.
470 */
471 public long setTimeBetweenExpirationCheckMillis() {
472 return this.connectionsPool.getTimeBetweenEvictionRunsMillis();
473 }
474
475 /**
476 * @return the number of Connections currently in the Pool
477 */
478 public int getNumConnections() {
479 return this.connectionsPool.getNumIdle();
480 }
481
482 /**
483 * @deprecated
484 */
485 public void setPoolFactory(ObjectPoolFactory<?> factory) {
486 }
487
488 /**
489 * Delegate that creates each instance of an ConnectionPool object. Subclasses can override
490 * this method to customize the type of connection pool returned.
491 *
492 * @param connection
493 *
494 * @return instance of a new ConnectionPool.
495 */
496 protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
497 return new ConnectionPool(connection);
498 }
499 }