001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.pool;
019    
020    import java.io.IOException;
021    import java.util.List;
022    import java.util.concurrent.CopyOnWriteArrayList;
023    import java.util.concurrent.atomic.AtomicBoolean;
024    
025    import javax.jms.JMSException;
026    import javax.jms.Session;
027    
028    import org.apache.activemq.ActiveMQConnection;
029    import org.apache.activemq.ActiveMQSession;
030    import org.apache.activemq.transport.TransportListener;
031    import org.apache.activemq.util.JMSExceptionSupport;
032    import org.apache.commons.pool.KeyedPoolableObjectFactory;
033    import org.apache.commons.pool.impl.GenericKeyedObjectPool;
034    import org.apache.commons.pool.impl.GenericObjectPool;
035    
036    /**
037     * Holds a real JMS connection along with the session pools associated with it.
038     * <p/>
039     * Instances of this class are shared amongst one or more PooledConnection object and must
040     * track the session objects that are loaned out for cleanup on close as well as ensuring
041     * that the temporary destinations of the managed Connection are purged when all references
042     * to this ConnectionPool are released.
043     */
044    public class ConnectionPool {
045    
046        private ActiveMQConnection connection;
047        private int referenceCount;
048        private long lastUsed = System.currentTimeMillis();
049        private long firstUsed = lastUsed;
050        private boolean hasFailed;
051        private boolean hasExpired;
052        private int idleTimeout = 30 * 1000;
053        private long expiryTimeout = 0l;
054    
055        private final AtomicBoolean started = new AtomicBoolean(false);
056        private final GenericKeyedObjectPool<SessionKey, PooledSession> sessionPool;
057        private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
058    
059        public ConnectionPool(ActiveMQConnection connection) {
060    
061            this.connection = connection;
062    
063            // Add a transport Listener so that we can notice if this connection
064            // should be expired due to a connection failure.
065            connection.addTransportListener(new TransportListener() {
066                public void onCommand(Object command) {
067                }
068    
069                public void onException(IOException error) {
070                    synchronized (ConnectionPool.this) {
071                        hasFailed = true;
072                    }
073                }
074    
075                public void transportInterupted() {
076                }
077    
078                public void transportResumed() {
079                }
080            });
081    
082            // make sure that we set the hasFailed flag, in case the transport already failed
083            // prior to the addition of our new TransportListener
084            if(connection.isTransportFailed()) {
085                hasFailed = true;
086            }
087    
088            // Create our internal Pool of session instances.
089            this.sessionPool = new GenericKeyedObjectPool<SessionKey, PooledSession>(
090                new KeyedPoolableObjectFactory<SessionKey, PooledSession>() {
091    
092                    @Override
093                    public void activateObject(SessionKey key, PooledSession session) throws Exception {
094                        ConnectionPool.this.loanedSessions.add(session);
095                    }
096    
097                    @Override
098                    public void destroyObject(SessionKey key, PooledSession session) throws Exception {
099                        ConnectionPool.this.loanedSessions.remove(session);
100                        session.getInternalSession().close();
101                    }
102    
103                    @Override
104                    public PooledSession makeObject(SessionKey key) throws Exception {
105                        ActiveMQSession session = (ActiveMQSession)
106                                ConnectionPool.this.connection.createSession(key.isTransacted(), key.getAckMode());
107                        return new PooledSession(key, session, sessionPool);
108                    }
109    
110                    @Override
111                    public void passivateObject(SessionKey key, PooledSession session) throws Exception {
112                        ConnectionPool.this.loanedSessions.remove(session);
113                    }
114    
115                    @Override
116                    public boolean validateObject(SessionKey key, PooledSession session) {
117                        return true;
118                    }
119                }
120            );
121        }
122    
123        public void start() throws JMSException {
124            if (started.compareAndSet(false, true)) {
125                try {
126                    connection.start();
127                } catch (JMSException e) {
128                    started.set(false);
129                    throw(e);
130                }
131            }
132        }
133    
134        public synchronized ActiveMQConnection getConnection() {
135            return connection;
136        }
137    
138        public Session createSession(boolean transacted, int ackMode) throws JMSException {
139            SessionKey key = new SessionKey(transacted, ackMode);
140            PooledSession session;
141            try {
142                session = sessionPool.borrowObject(key);
143            } catch (Exception e) {
144                throw JMSExceptionSupport.create(e);
145            }
146            return session;
147        }
148    
149        public synchronized void close() {
150            if (connection != null) {
151                try {
152                    sessionPool.close();
153                } catch (Exception e) {
154                } finally {
155                    try {
156                        connection.close();
157                    } catch (Exception e) {
158                    } finally {
159                        connection = null;
160                    }
161                }
162            }
163        }
164    
165        public synchronized void incrementReferenceCount() {
166            referenceCount++;
167            lastUsed = System.currentTimeMillis();
168        }
169    
170        public synchronized void decrementReferenceCount() {
171            referenceCount--;
172            lastUsed = System.currentTimeMillis();
173            if (referenceCount == 0) {
174                expiredCheck();
175    
176                // Loaned sessions are those that are active in the sessionPool and
177                // have not been closed by the client before closing the connection.
178                // These need to be closed so that all session's reflect the fact
179                // that the parent Connection is closed.
180                for (PooledSession session : this.loanedSessions) {
181                    try {
182                        session.close();
183                    } catch (Exception e) {
184                    }
185                }
186                this.loanedSessions.clear();
187    
188                // We only clean up temporary destinations when all users of this
189                // connection have called close.
190                if (getConnection() != null) {
191                    getConnection().cleanUpTempDestinations();
192                }
193            }
194        }
195    
196        /**
197         * Determines if this Connection has expired.
198         * <p/>
199         * A ConnectionPool is considered expired when all references to it are released AND either
200         * the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed.
201         * Once a ConnectionPool is determined to have expired its underlying Connection is closed.
202         *
203         * @return true if this connection has expired.
204         */
205        public synchronized boolean expiredCheck() {
206            if (connection == null) {
207                return true;
208            }
209    
210            if (hasExpired) {
211                if (referenceCount == 0) {
212                    close();
213                }
214                return true;
215            }
216    
217            if (hasFailed
218                    || (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)
219                    || expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
220    
221                hasExpired = true;
222                if (referenceCount == 0) {
223                    close();
224                }
225                return true;
226            }
227            return false;
228        }
229    
230        public int getIdleTimeout() {
231            return idleTimeout;
232        }
233    
234        public void setIdleTimeout(int idleTimeout) {
235            this.idleTimeout = idleTimeout;
236        }
237    
238        public void setExpiryTimeout(long expiryTimeout) {
239            this.expiryTimeout = expiryTimeout;
240        }
241    
242        public long getExpiryTimeout() {
243            return expiryTimeout;
244        }
245    
246        public int getMaximumActiveSessionPerConnection() {
247            return this.sessionPool.getMaxActive();
248        }
249    
250        public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
251            this.sessionPool.setMaxActive(maximumActiveSessionPerConnection);
252        }
253    
254        /**
255         * @return the total number of Pooled session including idle sessions that are not
256         *          currently loaned out to any client.
257         */
258        public int getNumSessions() {
259            return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive();
260        }
261    
262        /**
263         * @return the total number of Sessions that are in the Session pool but not loaned out.
264         */
265        public int getNumIdleSessions() {
266            return this.sessionPool.getNumIdle();
267        }
268    
269        /**
270         * @return the total number of Session's that have been loaned to PooledConnection instances.
271         */
272        public int getNumActiveSessions() {
273            return this.sessionPool.getNumActive();
274        }
275    
276        /**
277         * Configure whether the createSession method should block when there are no more idle sessions and the
278         * pool already contains the maximum number of active sessions.  If false the create method will fail
279         * and throw an exception.
280         *
281         * @param block
282         *          Indicates whether blocking should be used to wait for more space to create a session.
283         */
284        public void setBlockIfSessionPoolIsFull(boolean block) {
285            this.sessionPool.setWhenExhaustedAction(
286                    (block ? GenericObjectPool.WHEN_EXHAUSTED_BLOCK : GenericObjectPool.WHEN_EXHAUSTED_FAIL));
287        }
288    
289        public boolean isBlockIfSessionPoolIsFull() {
290            return this.sessionPool.getWhenExhaustedAction() == GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
291        }
292    
293        @Override
294        public String toString() {
295            return "ConnectionPool[" + connection + "]";
296        }
297    }