001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.pool;
018    
019    import java.io.Serializable;
020    import java.util.Iterator;
021    import java.util.concurrent.CopyOnWriteArrayList;
022    
023    import javax.jms.BytesMessage;
024    import javax.jms.Destination;
025    import javax.jms.JMSException;
026    import javax.jms.MapMessage;
027    import javax.jms.Message;
028    import javax.jms.MessageConsumer;
029    import javax.jms.MessageListener;
030    import javax.jms.MessageProducer;
031    import javax.jms.ObjectMessage;
032    import javax.jms.Queue;
033    import javax.jms.QueueBrowser;
034    import javax.jms.QueueReceiver;
035    import javax.jms.QueueSender;
036    import javax.jms.QueueSession;
037    import javax.jms.Session;
038    import javax.jms.StreamMessage;
039    import javax.jms.TemporaryQueue;
040    import javax.jms.TemporaryTopic;
041    import javax.jms.TextMessage;
042    import javax.jms.Topic;
043    import javax.jms.TopicPublisher;
044    import javax.jms.TopicSession;
045    import javax.jms.TopicSubscriber;
046    import javax.jms.XASession;
047    import javax.transaction.xa.XAResource;
048    
049    import org.apache.activemq.ActiveMQMessageProducer;
050    import org.apache.activemq.ActiveMQQueueSender;
051    import org.apache.activemq.ActiveMQSession;
052    import org.apache.activemq.ActiveMQTopicPublisher;
053    import org.apache.activemq.AlreadyClosedException;
054    import org.apache.activemq.util.JMSExceptionSupport;
055    import org.apache.commons.pool.KeyedObjectPool;
056    import org.slf4j.Logger;
057    import org.slf4j.LoggerFactory;
058    
059    public class PooledSession implements Session, TopicSession, QueueSession, XASession {
060        private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
061    
062        private final SessionKey key;
063        private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
064        private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
065        private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
066        private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners =
067            new CopyOnWriteArrayList<PooledSessionEventListener>();
068    
069        private ActiveMQSession session;
070        private ActiveMQMessageProducer messageProducer;
071        private ActiveMQQueueSender queueSender;
072        private ActiveMQTopicPublisher topicPublisher;
073        private boolean transactional = true;
074        private boolean ignoreClose;
075        private boolean isXa;
076    
077        public PooledSession(SessionKey key, ActiveMQSession session, KeyedObjectPool<SessionKey, PooledSession> sessionPool) {
078            this.key = key;
079            this.session = session;
080            this.sessionPool = sessionPool;
081            this.transactional = session.isTransacted();
082        }
083    
084        public void addSessionEventListener(PooledSessionEventListener listener) {
085            // only add if really needed
086            if (!sessionEventListeners.contains(listener)) {
087                this.sessionEventListeners.add(listener);
088            }
089        }
090    
091        protected boolean isIgnoreClose() {
092            return ignoreClose;
093        }
094    
095        protected void setIgnoreClose(boolean ignoreClose) {
096            this.ignoreClose = ignoreClose;
097        }
098    
099        @Override
100        public void close() throws JMSException {
101            if (!ignoreClose) {
102                boolean invalidate = false;
103                try {
104                    // lets reset the session
105                    getInternalSession().setMessageListener(null);
106    
107                    // Close any consumers and browsers that may have been created.
108                    for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
109                        MessageConsumer consumer = iter.next();
110                        consumer.close();
111                    }
112    
113                    for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
114                        QueueBrowser browser = iter.next();
115                        browser.close();
116                    }
117    
118                    if (transactional && !isXa) {
119                        try {
120                            getInternalSession().rollback();
121                        } catch (JMSException e) {
122                            invalidate = true;
123                            LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
124                        }
125                    }
126                } catch (JMSException ex) {
127                    invalidate = true;
128                    LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
129                } finally {
130                    consumers.clear();
131                    browsers.clear();
132                    for (PooledSessionEventListener listener : this.sessionEventListeners) {
133                        listener.onSessionClosed(this);
134                    }
135                    sessionEventListeners.clear();
136                }
137    
138                if (invalidate) {
139                    // lets close the session and not put the session back into the pool
140                    // instead invalidate it so the pool can create a new one on demand.
141                    if (session != null) {
142                        try {
143                            session.close();
144                        } catch (JMSException e1) {
145                            LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
146                        }
147                        session = null;
148                    }
149                    try {
150                        sessionPool.invalidateObject(key, this);
151                    } catch (Exception e) {
152                        throw JMSExceptionSupport.create(e);
153                    }
154                } else {
155                    try {
156                        sessionPool.returnObject(key, this);
157                    } catch (Exception e) {
158                        throw JMSExceptionSupport.create(e);
159                    }
160                }
161            }
162        }
163    
164        @Override
165        public void commit() throws JMSException {
166            getInternalSession().commit();
167        }
168    
169        @Override
170        public BytesMessage createBytesMessage() throws JMSException {
171            return getInternalSession().createBytesMessage();
172        }
173    
174        @Override
175        public MapMessage createMapMessage() throws JMSException {
176            return getInternalSession().createMapMessage();
177        }
178    
179        @Override
180        public Message createMessage() throws JMSException {
181            return getInternalSession().createMessage();
182        }
183    
184        @Override
185        public ObjectMessage createObjectMessage() throws JMSException {
186            return getInternalSession().createObjectMessage();
187        }
188    
189        @Override
190        public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
191            return getInternalSession().createObjectMessage(serializable);
192        }
193    
194        @Override
195        public Queue createQueue(String s) throws JMSException {
196            return getInternalSession().createQueue(s);
197        }
198    
199        @Override
200        public StreamMessage createStreamMessage() throws JMSException {
201            return getInternalSession().createStreamMessage();
202        }
203    
204        @Override
205        public TemporaryQueue createTemporaryQueue() throws JMSException {
206            TemporaryQueue result;
207    
208            result = getInternalSession().createTemporaryQueue();
209    
210            // Notify all of the listeners of the created temporary Queue.
211            for (PooledSessionEventListener listener : this.sessionEventListeners) {
212                listener.onTemporaryQueueCreate(result);
213            }
214    
215            return result;
216        }
217    
218        @Override
219        public TemporaryTopic createTemporaryTopic() throws JMSException {
220            TemporaryTopic result;
221    
222            result = getInternalSession().createTemporaryTopic();
223    
224            // Notify all of the listeners of the created temporary Topic.
225            for (PooledSessionEventListener listener : this.sessionEventListeners) {
226                listener.onTemporaryTopicCreate(result);
227            }
228    
229            return result;
230        }
231    
232        @Override
233        public void unsubscribe(String s) throws JMSException {
234            getInternalSession().unsubscribe(s);
235        }
236    
237        @Override
238        public TextMessage createTextMessage() throws JMSException {
239            return getInternalSession().createTextMessage();
240        }
241    
242        @Override
243        public TextMessage createTextMessage(String s) throws JMSException {
244            return getInternalSession().createTextMessage(s);
245        }
246    
247        @Override
248        public Topic createTopic(String s) throws JMSException {
249            return getInternalSession().createTopic(s);
250        }
251    
252        @Override
253        public int getAcknowledgeMode() throws JMSException {
254            return getInternalSession().getAcknowledgeMode();
255        }
256    
257        @Override
258        public boolean getTransacted() throws JMSException {
259            return getInternalSession().getTransacted();
260        }
261    
262        @Override
263        public void recover() throws JMSException {
264            getInternalSession().recover();
265        }
266    
267        @Override
268        public void rollback() throws JMSException {
269            getInternalSession().rollback();
270        }
271    
272        @Override
273        public XAResource getXAResource() {
274            if (session == null) {
275                throw new IllegalStateException("Session is closed");
276            }
277            return session.getTransactionContext();
278        }
279    
280        @Override
281        public Session getSession() {
282            return this;
283        }
284    
285        @Override
286        public void run() {
287            if (session != null) {
288                session.run();
289            }
290        }
291    
292        // Consumer related methods
293        // -------------------------------------------------------------------------
294        @Override
295        public QueueBrowser createBrowser(Queue queue) throws JMSException {
296            return addQueueBrowser(getInternalSession().createBrowser(queue));
297        }
298    
299        @Override
300        public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
301            return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
302        }
303    
304        @Override
305        public MessageConsumer createConsumer(Destination destination) throws JMSException {
306            return addConsumer(getInternalSession().createConsumer(destination));
307        }
308    
309        @Override
310        public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
311            return addConsumer(getInternalSession().createConsumer(destination, selector));
312        }
313    
314        @Override
315        public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
316            return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
317        }
318    
319        @Override
320        public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
321            return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
322        }
323    
324        @Override
325        public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
326            return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
327        }
328    
329        @Override
330        public MessageListener getMessageListener() throws JMSException {
331            return getInternalSession().getMessageListener();
332        }
333    
334        @Override
335        public void setMessageListener(MessageListener messageListener) throws JMSException {
336            getInternalSession().setMessageListener(messageListener);
337        }
338    
339        @Override
340        public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
341            return addTopicSubscriber(getInternalSession().createSubscriber(topic));
342        }
343    
344        @Override
345        public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
346            return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local));
347        }
348    
349        @Override
350        public QueueReceiver createReceiver(Queue queue) throws JMSException {
351            return addQueueReceiver(getInternalSession().createReceiver(queue));
352        }
353    
354        @Override
355        public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
356            return addQueueReceiver(getInternalSession().createReceiver(queue, selector));
357        }
358    
359        // Producer related methods
360        // -------------------------------------------------------------------------
361        @Override
362        public MessageProducer createProducer(Destination destination) throws JMSException {
363            return new PooledProducer(getMessageProducer(), destination);
364        }
365    
366        @Override
367        public QueueSender createSender(Queue queue) throws JMSException {
368            return new PooledQueueSender(getQueueSender(), queue);
369        }
370    
371        @Override
372        public TopicPublisher createPublisher(Topic topic) throws JMSException {
373            return new PooledTopicPublisher(getTopicPublisher(), topic);
374        }
375    
376        /**
377         * Callback invoked when the consumer is closed.
378         * <p/>
379         * This is used to keep track of an explicit closed consumer created by this
380         * session, by which we know do not need to keep track of the consumer, as
381         * its already closed.
382         *
383         * @param consumer
384         *            the consumer which is being closed
385         */
386        protected void onConsumerClose(MessageConsumer consumer) {
387            consumers.remove(consumer);
388        }
389    
390        public ActiveMQSession getInternalSession() throws AlreadyClosedException {
391            if (session == null) {
392                throw new AlreadyClosedException("The session has already been closed");
393            }
394            return session;
395        }
396    
397        public ActiveMQMessageProducer getMessageProducer() throws JMSException {
398            if (messageProducer == null) {
399                messageProducer = (ActiveMQMessageProducer) getInternalSession().createProducer(null);
400            }
401            return messageProducer;
402        }
403    
404        public ActiveMQQueueSender getQueueSender() throws JMSException {
405            if (queueSender == null) {
406                queueSender = (ActiveMQQueueSender) getInternalSession().createSender(null);
407            }
408            return queueSender;
409        }
410    
411        public ActiveMQTopicPublisher getTopicPublisher() throws JMSException {
412            if (topicPublisher == null) {
413                topicPublisher = (ActiveMQTopicPublisher) getInternalSession().createPublisher(null);
414            }
415            return topicPublisher;
416        }
417    
418        private QueueBrowser addQueueBrowser(QueueBrowser browser) {
419            browsers.add(browser);
420            return browser;
421        }
422    
423        private MessageConsumer addConsumer(MessageConsumer consumer) {
424            consumers.add(consumer);
425            // must wrap in PooledMessageConsumer to ensure the onConsumerClose method is
426            // invoked when the returned consumer is closed, to avoid memory leak in this
427            // session class in case many consumers is created
428            return new PooledMessageConsumer(this, consumer);
429        }
430    
431        private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
432            consumers.add(subscriber);
433            return subscriber;
434        }
435    
436        private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
437            consumers.add(receiver);
438            return receiver;
439        }
440    
441        public void setIsXa(boolean isXa) {
442            this.isXa = isXa;
443        }
444    
445        @Override
446        public String toString() {
447            return "PooledSession { " + session + " }";
448        }
449    }