001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.pool;
018    
019    import java.util.List;
020    import java.util.concurrent.CopyOnWriteArrayList;
021    
022    import javax.jms.Connection;
023    import javax.jms.ConnectionConsumer;
024    import javax.jms.ConnectionMetaData;
025    import javax.jms.Destination;
026    import javax.jms.ExceptionListener;
027    import javax.jms.JMSException;
028    import javax.jms.Queue;
029    import javax.jms.QueueConnection;
030    import javax.jms.QueueSession;
031    import javax.jms.ServerSessionPool;
032    import javax.jms.Session;
033    import javax.jms.TemporaryQueue;
034    import javax.jms.TemporaryTopic;
035    import javax.jms.Topic;
036    import javax.jms.TopicConnection;
037    import javax.jms.TopicSession;
038    
039    import org.apache.activemq.ActiveMQConnection;
040    import org.apache.activemq.ActiveMQSession;
041    import org.apache.activemq.AlreadyClosedException;
042    import org.apache.activemq.EnhancedConnection;
043    import org.apache.activemq.advisory.DestinationSource;
044    import org.slf4j.Logger;
045    import org.slf4j.LoggerFactory;
046    
047    /**
048     * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
049     * {@link QueueConnection} which is pooled and on {@link #close()} will return
050     * its reference to the ConnectionPool backing it.
051     *
052     * <b>NOTE</b> this implementation is only intended for use when sending
053     * messages. It does not deal with pooling of consumers; for that look at a
054     * library like <a href="http://jencks.org/">Jencks</a> such as in <a
055     * href="http://jencks.org/Message+Driven+POJOs">this example</a>
056     *
057     */
058    public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection, PooledSessionEventListener {
059        private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
060    
061        private ConnectionPool pool;
062        private volatile boolean stopped;
063        private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
064        private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
065        private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
066    
067        /**
068         * Creates a new PooledConnection instance that uses the given ConnectionPool to create
069         * and manage its resources.  The ConnectionPool instance can be shared amongst many
070         * PooledConnection instances.
071         *
072         * @param pool
073         *      The connection and pool manager backing this proxy connection object.
074         */
075        public PooledConnection(ConnectionPool pool) {
076            this.pool = pool;
077            this.pool.incrementReferenceCount();
078        }
079    
080        /**
081         * Factory method to create a new instance.
082         */
083        public PooledConnection newInstance() {
084            return new PooledConnection(pool);
085        }
086    
087        @Override
088        public void close() throws JMSException {
089            this.cleanupConnectionTemporaryDestinations();
090            this.cleanupAllLoanedSessions();
091            if (this.pool != null) {
092                this.pool.decrementReferenceCount();
093                this.pool = null;
094            }
095        }
096    
097        @Override
098        public void start() throws JMSException {
099            assertNotClosed();
100            pool.start();
101        }
102    
103        @Override
104        public void stop() throws JMSException {
105            stopped = true;
106        }
107    
108        @Override
109        public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
110            return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
111        }
112    
113        @Override
114        public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
115            return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
116        }
117    
118        @Override
119        public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException {
120            return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
121        }
122    
123        @Override
124        public String getClientID() throws JMSException {
125            return getConnection().getClientID();
126        }
127    
128        @Override
129        public ExceptionListener getExceptionListener() throws JMSException {
130            return getConnection().getExceptionListener();
131        }
132    
133        @Override
134        public ConnectionMetaData getMetaData() throws JMSException {
135            return getConnection().getMetaData();
136        }
137    
138        @Override
139        public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
140            getConnection().setExceptionListener(exceptionListener);
141        }
142    
143        @Override
144        public void setClientID(String clientID) throws JMSException {
145    
146            // ignore repeated calls to setClientID() with the same client id
147            // this could happen when a JMS component such as Spring that uses a
148            // PooledConnectionFactory shuts down and reinitializes.
149            if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) {
150                getConnection().setClientID(clientID);
151            }
152        }
153    
154        @Override
155        public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
156            return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
157        }
158    
159        // Session factory methods
160        // -------------------------------------------------------------------------
161        @Override
162        public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
163            return (QueueSession) createSession(transacted, ackMode);
164        }
165    
166        @Override
167        public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
168            return (TopicSession) createSession(transacted, ackMode);
169        }
170    
171        @Override
172        public Session createSession(boolean transacted, int ackMode) throws JMSException {
173            PooledSession result;
174            result = (PooledSession) pool.createSession(transacted, ackMode);
175    
176            // Store the session so we can close the sessions that this PooledConnection
177            // created in order to ensure that consumers etc are closed per the JMS contract.
178            loanedSessions.add(result);
179    
180            // Add a event listener to the session that notifies us when the session
181            // creates / destroys temporary destinations and closes etc.
182            result.addSessionEventListener(this);
183            return result;
184        }
185    
186        // EnhancedCollection API
187        // -------------------------------------------------------------------------
188    
189        @Override
190        public DestinationSource getDestinationSource() throws JMSException {
191            return getConnection().getDestinationSource();
192        }
193    
194        // Implementation methods
195        // -------------------------------------------------------------------------
196    
197        @Override
198        public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
199            connTempQueues.add(tempQueue);
200        }
201    
202        @Override
203        public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
204            connTempTopics.add(tempTopic);
205        }
206    
207        @Override
208        public void onSessionClosed(PooledSession session) {
209            if (session != null) {
210                this.loanedSessions.remove(session);
211            }
212        }
213    
214        public ActiveMQConnection getConnection() throws JMSException {
215            assertNotClosed();
216            return pool.getConnection();
217        }
218    
219        protected void assertNotClosed() throws AlreadyClosedException {
220            if (stopped || pool == null) {
221                throw new AlreadyClosedException();
222            }
223        }
224    
225        protected ActiveMQSession createSession(SessionKey key) throws JMSException {
226            return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode());
227        }
228    
229        @Override
230        public String toString() {
231            return "PooledConnection { " + pool + " }";
232        }
233    
234        /**
235         * Remove all of the temporary destinations created for this connection.
236         * This is important since the underlying connection may be reused over a
237         * long period of time, accumulating all of the temporary destinations from
238         * each use. However, from the perspective of the lifecycle from the
239         * client's view, close() closes the connection and, therefore, deletes all
240         * of the temporary destinations created.
241         */
242        protected void cleanupConnectionTemporaryDestinations() {
243    
244            for (TemporaryQueue tempQueue : connTempQueues) {
245                try {
246                    tempQueue.delete();
247                } catch (JMSException ex) {
248                    LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage());
249                }
250            }
251            connTempQueues.clear();
252    
253            for (TemporaryTopic tempTopic : connTempTopics) {
254                try {
255                    tempTopic.delete();
256                } catch (JMSException ex) {
257                    LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage());
258                }
259            }
260            connTempTopics.clear();
261        }
262    
263        /**
264         * The PooledSession tracks all Sessions that it created and now we close them.  Closing the
265         * PooledSession will return the internal Session to the Pool of Session after cleaning up
266         * all the resources that the Session had allocated for this PooledConnection.
267         */
268        protected void cleanupAllLoanedSessions() {
269    
270            for (PooledSession session : loanedSessions) {
271                try {
272                    session.close();
273                } catch (JMSException ex) {
274                    LOG.info("failed to close laoned Session \"" + session + "\" on closing pooled connection: " + ex.getMessage());
275                }
276            }
277            loanedSessions.clear();
278        }
279    
280        /**
281         * @return the total number of Pooled session including idle sessions that are not
282         *          currently loaned out to any client.
283         */
284        public int getNumSessions() {
285            return this.pool.getNumSessions();
286        }
287    
288        /**
289         * @return the number of Sessions that are currently checked out of this Connection's session pool.
290         */
291        public int getNumActiveSessions() {
292            return this.pool.getNumActiveSessions();
293        }
294    
295        /**
296         * @return the number of Sessions that are idle in this Connection's sessions pool.
297         */
298        public int getNumtIdleSessions() {
299            return this.pool.getNumIdleSessions();
300        }
301    }