001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.jms.pool;
018    
019    import java.io.Serializable;
020    import java.util.Iterator;
021    import java.util.concurrent.CopyOnWriteArrayList;
022    
023    import javax.jms.BytesMessage;
024    import javax.jms.Destination;
025    import javax.jms.JMSException;
026    import javax.jms.MapMessage;
027    import javax.jms.Message;
028    import javax.jms.MessageConsumer;
029    import javax.jms.MessageListener;
030    import javax.jms.MessageProducer;
031    import javax.jms.ObjectMessage;
032    import javax.jms.Queue;
033    import javax.jms.QueueBrowser;
034    import javax.jms.QueueReceiver;
035    import javax.jms.QueueSender;
036    import javax.jms.QueueSession;
037    import javax.jms.Session;
038    import javax.jms.StreamMessage;
039    import javax.jms.TemporaryQueue;
040    import javax.jms.TemporaryTopic;
041    import javax.jms.TextMessage;
042    import javax.jms.Topic;
043    import javax.jms.TopicPublisher;
044    import javax.jms.TopicSession;
045    import javax.jms.TopicSubscriber;
046    import javax.jms.XASession;
047    import javax.transaction.xa.XAResource;
048    
049    import org.apache.commons.pool.KeyedObjectPool;
050    import org.slf4j.Logger;
051    import org.slf4j.LoggerFactory;
052    
053    public class PooledSession implements Session, TopicSession, QueueSession, XASession {
054        private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
055    
056        private final SessionKey key;
057        private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
058        private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
059        private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
060        private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners =
061            new CopyOnWriteArrayList<PooledSessionEventListener>();
062    
063        private Session session;
064        private MessageProducer messageProducer;
065        private QueueSender queueSender;
066        private TopicPublisher topicPublisher;
067        private boolean transactional = true;
068        private boolean ignoreClose;
069        private boolean isXa;
070    
071        public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, PooledSession> sessionPool, boolean transactional) {
072            this.key = key;
073            this.session = session;
074            this.sessionPool = sessionPool;
075            this.transactional = transactional;
076        }
077    
078        public void addSessionEventListener(PooledSessionEventListener listener) {
079            // only add if really needed
080            if (!sessionEventListeners.contains(listener)) {
081                this.sessionEventListeners.add(listener);
082            }
083        }
084    
085        protected boolean isIgnoreClose() {
086            return ignoreClose;
087        }
088    
089        protected void setIgnoreClose(boolean ignoreClose) {
090            this.ignoreClose = ignoreClose;
091        }
092    
093        @Override
094        public void close() throws JMSException {
095            if (!ignoreClose) {
096                boolean invalidate = false;
097                try {
098                    // lets reset the session
099                    getInternalSession().setMessageListener(null);
100    
101                    // Close any consumers and browsers that may have been created.
102                    for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
103                        MessageConsumer consumer = iter.next();
104                        consumer.close();
105                    }
106    
107                    for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
108                        QueueBrowser browser = iter.next();
109                        browser.close();
110                    }
111    
112                    if (transactional && !isXa) {
113                        try {
114                            getInternalSession().rollback();
115                        } catch (JMSException e) {
116                            invalidate = true;
117                            LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
118                        }
119                    }
120                } catch (JMSException ex) {
121                    invalidate = true;
122                    LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
123                } finally {
124                    consumers.clear();
125                    browsers.clear();
126                    for (PooledSessionEventListener listener : this.sessionEventListeners) {
127                        listener.onSessionClosed(this);
128                    }
129                    sessionEventListeners.clear();
130                }
131    
132                if (invalidate) {
133                    // lets close the session and not put the session back into the pool
134                    // instead invalidate it so the pool can create a new one on demand.
135                    if (session != null) {
136                        try {
137                            session.close();
138                        } catch (JMSException e1) {
139                            LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
140                        }
141                        session = null;
142                    }
143                    try {
144                        sessionPool.invalidateObject(key, this);
145                    } catch (Exception e) {
146                        LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
147                    }
148                } else {
149                    try {
150                        sessionPool.returnObject(key, this);
151                    } catch (Exception e) {
152                        javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
153                        illegalStateException.initCause(e);
154                        throw illegalStateException;
155                    }
156                }
157            }
158        }
159    
160        @Override
161        public void commit() throws JMSException {
162            getInternalSession().commit();
163        }
164    
165        @Override
166        public BytesMessage createBytesMessage() throws JMSException {
167            return getInternalSession().createBytesMessage();
168        }
169    
170        @Override
171        public MapMessage createMapMessage() throws JMSException {
172            return getInternalSession().createMapMessage();
173        }
174    
175        @Override
176        public Message createMessage() throws JMSException {
177            return getInternalSession().createMessage();
178        }
179    
180        @Override
181        public ObjectMessage createObjectMessage() throws JMSException {
182            return getInternalSession().createObjectMessage();
183        }
184    
185        @Override
186        public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
187            return getInternalSession().createObjectMessage(serializable);
188        }
189    
190        @Override
191        public Queue createQueue(String s) throws JMSException {
192            return getInternalSession().createQueue(s);
193        }
194    
195        @Override
196        public StreamMessage createStreamMessage() throws JMSException {
197            return getInternalSession().createStreamMessage();
198        }
199    
200        @Override
201        public TemporaryQueue createTemporaryQueue() throws JMSException {
202            TemporaryQueue result;
203    
204            result = getInternalSession().createTemporaryQueue();
205    
206            // Notify all of the listeners of the created temporary Queue.
207            for (PooledSessionEventListener listener : this.sessionEventListeners) {
208                listener.onTemporaryQueueCreate(result);
209            }
210    
211            return result;
212        }
213    
214        @Override
215        public TemporaryTopic createTemporaryTopic() throws JMSException {
216            TemporaryTopic result;
217    
218            result = getInternalSession().createTemporaryTopic();
219    
220            // Notify all of the listeners of the created temporary Topic.
221            for (PooledSessionEventListener listener : this.sessionEventListeners) {
222                listener.onTemporaryTopicCreate(result);
223            }
224    
225            return result;
226        }
227    
228        @Override
229        public void unsubscribe(String s) throws JMSException {
230            getInternalSession().unsubscribe(s);
231        }
232    
233        @Override
234        public TextMessage createTextMessage() throws JMSException {
235            return getInternalSession().createTextMessage();
236        }
237    
238        @Override
239        public TextMessage createTextMessage(String s) throws JMSException {
240            return getInternalSession().createTextMessage(s);
241        }
242    
243        @Override
244        public Topic createTopic(String s) throws JMSException {
245            return getInternalSession().createTopic(s);
246        }
247    
248        @Override
249        public int getAcknowledgeMode() throws JMSException {
250            return getInternalSession().getAcknowledgeMode();
251        }
252    
253        @Override
254        public boolean getTransacted() throws JMSException {
255            return getInternalSession().getTransacted();
256        }
257    
258        @Override
259        public void recover() throws JMSException {
260            getInternalSession().recover();
261        }
262    
263        @Override
264        public void rollback() throws JMSException {
265            getInternalSession().rollback();
266        }
267    
268        @Override
269        public XAResource getXAResource() {
270            if (session instanceof XASession) {
271                return ((XASession)session).getXAResource();
272            }
273            return null;
274        }
275    
276        @Override
277        public Session getSession() {
278            return this;
279        }
280    
281        @Override
282        public void run() {
283            if (session != null) {
284                session.run();
285            }
286        }
287    
288        // Consumer related methods
289        // -------------------------------------------------------------------------
290        @Override
291        public QueueBrowser createBrowser(Queue queue) throws JMSException {
292            return addQueueBrowser(getInternalSession().createBrowser(queue));
293        }
294    
295        @Override
296        public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
297            return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
298        }
299    
300        @Override
301        public MessageConsumer createConsumer(Destination destination) throws JMSException {
302            return addConsumer(getInternalSession().createConsumer(destination));
303        }
304    
305        @Override
306        public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
307            return addConsumer(getInternalSession().createConsumer(destination, selector));
308        }
309    
310        @Override
311        public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
312            return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
313        }
314    
315        @Override
316        public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
317            return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
318        }
319    
320        @Override
321        public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
322            return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
323        }
324    
325        @Override
326        public MessageListener getMessageListener() throws JMSException {
327            return getInternalSession().getMessageListener();
328        }
329    
330        @Override
331        public void setMessageListener(MessageListener messageListener) throws JMSException {
332            getInternalSession().setMessageListener(messageListener);
333        }
334    
335        @Override
336        public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
337            return addTopicSubscriber(((TopicSession)getInternalSession()).createSubscriber(topic));
338        }
339    
340        @Override
341        public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
342            return addTopicSubscriber(((TopicSession)getInternalSession()).createSubscriber(topic, selector, local));
343        }
344    
345        @Override
346        public QueueReceiver createReceiver(Queue queue) throws JMSException {
347            return addQueueReceiver(((QueueSession)getInternalSession()).createReceiver(queue));
348        }
349    
350        @Override
351        public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
352            return addQueueReceiver(((QueueSession)getInternalSession()).createReceiver(queue, selector));
353        }
354    
355        // Producer related methods
356        // -------------------------------------------------------------------------
357        @Override
358        public MessageProducer createProducer(Destination destination) throws JMSException {
359            return new PooledProducer(getMessageProducer(), destination);
360        }
361    
362        @Override
363        public QueueSender createSender(Queue queue) throws JMSException {
364            return new PooledQueueSender(getQueueSender(), queue);
365        }
366    
367        @Override
368        public TopicPublisher createPublisher(Topic topic) throws JMSException {
369            return new PooledTopicPublisher(getTopicPublisher(), topic);
370        }
371    
372        /**
373         * Callback invoked when the consumer is closed.
374         * <p/>
375         * This is used to keep track of an explicit closed consumer created by this
376         * session, by which we know do not need to keep track of the consumer, as
377         * its already closed.
378         *
379         * @param consumer
380         *            the consumer which is being closed
381         */
382        protected void onConsumerClose(MessageConsumer consumer) {
383            consumers.remove(consumer);
384        }
385    
386        public Session getInternalSession() throws IllegalStateException {
387            if (session == null) {
388                throw new IllegalStateException("The session has already been closed");
389            }
390            return session;
391        }
392    
393        public MessageProducer getMessageProducer() throws JMSException {
394            if (messageProducer == null) {
395                messageProducer = getInternalSession().createProducer(null);
396            }
397            return messageProducer;
398        }
399    
400        public QueueSender getQueueSender() throws JMSException {
401            if (queueSender == null) {
402                queueSender = ((QueueSession)getInternalSession()).createSender(null);
403            }
404            return queueSender;
405        }
406    
407        public TopicPublisher getTopicPublisher() throws JMSException {
408            if (topicPublisher == null) {
409                topicPublisher = ((TopicSession)getInternalSession()).createPublisher(null);
410            }
411            return topicPublisher;
412        }
413    
414        private QueueBrowser addQueueBrowser(QueueBrowser browser) {
415            browsers.add(browser);
416            return browser;
417        }
418    
419        private MessageConsumer addConsumer(MessageConsumer consumer) {
420            consumers.add(consumer);
421            // must wrap in PooledMessageConsumer to ensure the onConsumerClose method is
422            // invoked when the returned consumer is closed, to avoid memory leak in this
423            // session class in case many consumers is created
424            return new PooledMessageConsumer(this, consumer);
425        }
426    
427        private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
428            consumers.add(subscriber);
429            return subscriber;
430        }
431    
432        private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
433            consumers.add(receiver);
434            return receiver;
435        }
436    
437        public void setIsXa(boolean isXa) {
438            this.isXa = isXa;
439        }
440    
441        @Override
442        public String toString() {
443            return "PooledSession { " + session + " }";
444        }
445    }