001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.pool;
018
019 import java.io.Serializable;
020 import java.util.Iterator;
021 import java.util.concurrent.CopyOnWriteArrayList;
022
023 import javax.jms.BytesMessage;
024 import javax.jms.Destination;
025 import javax.jms.JMSException;
026 import javax.jms.MapMessage;
027 import javax.jms.Message;
028 import javax.jms.MessageConsumer;
029 import javax.jms.MessageListener;
030 import javax.jms.MessageProducer;
031 import javax.jms.ObjectMessage;
032 import javax.jms.Queue;
033 import javax.jms.QueueBrowser;
034 import javax.jms.QueueReceiver;
035 import javax.jms.QueueSender;
036 import javax.jms.QueueSession;
037 import javax.jms.Session;
038 import javax.jms.StreamMessage;
039 import javax.jms.TemporaryQueue;
040 import javax.jms.TemporaryTopic;
041 import javax.jms.TextMessage;
042 import javax.jms.Topic;
043 import javax.jms.TopicPublisher;
044 import javax.jms.TopicSession;
045 import javax.jms.TopicSubscriber;
046 import javax.jms.XASession;
047 import javax.transaction.xa.XAResource;
048
049 import org.apache.activemq.ActiveMQMessageProducer;
050 import org.apache.activemq.ActiveMQQueueSender;
051 import org.apache.activemq.ActiveMQSession;
052 import org.apache.activemq.ActiveMQTopicPublisher;
053 import org.apache.activemq.AlreadyClosedException;
054 import org.apache.activemq.util.JMSExceptionSupport;
055 import org.apache.commons.pool.KeyedObjectPool;
056 import org.slf4j.Logger;
057 import org.slf4j.LoggerFactory;
058
059 public class PooledSession implements Session, TopicSession, QueueSession, XASession {
060 private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
061
062 private final SessionKey key;
063 private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
064 private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
065 private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
066 private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners =
067 new CopyOnWriteArrayList<PooledSessionEventListener>();
068
069 private ActiveMQSession session;
070 private ActiveMQMessageProducer messageProducer;
071 private ActiveMQQueueSender queueSender;
072 private ActiveMQTopicPublisher topicPublisher;
073 private boolean transactional = true;
074 private boolean ignoreClose;
075 private boolean isXa;
076
077 public PooledSession(SessionKey key, ActiveMQSession session, KeyedObjectPool<SessionKey, PooledSession> sessionPool) {
078 this.key = key;
079 this.session = session;
080 this.sessionPool = sessionPool;
081 this.transactional = session.isTransacted();
082 }
083
084 public void addSessionEventListener(PooledSessionEventListener listener) {
085 // only add if really needed
086 if (!sessionEventListeners.contains(listener)) {
087 this.sessionEventListeners.add(listener);
088 }
089 }
090
091 protected boolean isIgnoreClose() {
092 return ignoreClose;
093 }
094
095 protected void setIgnoreClose(boolean ignoreClose) {
096 this.ignoreClose = ignoreClose;
097 }
098
099 @Override
100 public void close() throws JMSException {
101 if (!ignoreClose) {
102 boolean invalidate = false;
103 try {
104 // lets reset the session
105 getInternalSession().setMessageListener(null);
106
107 // Close any consumers and browsers that may have been created.
108 for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
109 MessageConsumer consumer = iter.next();
110 consumer.close();
111 }
112
113 for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
114 QueueBrowser browser = iter.next();
115 browser.close();
116 }
117
118 if (transactional && !isXa) {
119 try {
120 getInternalSession().rollback();
121 } catch (JMSException e) {
122 invalidate = true;
123 LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
124 }
125 }
126 } catch (JMSException ex) {
127 invalidate = true;
128 LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
129 } finally {
130 consumers.clear();
131 browsers.clear();
132 for (PooledSessionEventListener listener : this.sessionEventListeners) {
133 listener.onSessionClosed(this);
134 }
135 sessionEventListeners.clear();
136 }
137
138 if (invalidate) {
139 // lets close the session and not put the session back into the pool
140 // instead invalidate it so the pool can create a new one on demand.
141 if (session != null) {
142 try {
143 session.close();
144 } catch (JMSException e1) {
145 LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
146 }
147 session = null;
148 }
149 try {
150 sessionPool.invalidateObject(key, this);
151 } catch (Exception e) {
152 throw JMSExceptionSupport.create(e);
153 }
154 } else {
155 try {
156 sessionPool.returnObject(key, this);
157 } catch (Exception e) {
158 throw JMSExceptionSupport.create(e);
159 }
160 }
161 }
162 }
163
164 @Override
165 public void commit() throws JMSException {
166 getInternalSession().commit();
167 }
168
169 @Override
170 public BytesMessage createBytesMessage() throws JMSException {
171 return getInternalSession().createBytesMessage();
172 }
173
174 @Override
175 public MapMessage createMapMessage() throws JMSException {
176 return getInternalSession().createMapMessage();
177 }
178
179 @Override
180 public Message createMessage() throws JMSException {
181 return getInternalSession().createMessage();
182 }
183
184 @Override
185 public ObjectMessage createObjectMessage() throws JMSException {
186 return getInternalSession().createObjectMessage();
187 }
188
189 @Override
190 public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
191 return getInternalSession().createObjectMessage(serializable);
192 }
193
194 @Override
195 public Queue createQueue(String s) throws JMSException {
196 return getInternalSession().createQueue(s);
197 }
198
199 @Override
200 public StreamMessage createStreamMessage() throws JMSException {
201 return getInternalSession().createStreamMessage();
202 }
203
204 @Override
205 public TemporaryQueue createTemporaryQueue() throws JMSException {
206 TemporaryQueue result;
207
208 result = getInternalSession().createTemporaryQueue();
209
210 // Notify all of the listeners of the created temporary Queue.
211 for (PooledSessionEventListener listener : this.sessionEventListeners) {
212 listener.onTemporaryQueueCreate(result);
213 }
214
215 return result;
216 }
217
218 @Override
219 public TemporaryTopic createTemporaryTopic() throws JMSException {
220 TemporaryTopic result;
221
222 result = getInternalSession().createTemporaryTopic();
223
224 // Notify all of the listeners of the created temporary Topic.
225 for (PooledSessionEventListener listener : this.sessionEventListeners) {
226 listener.onTemporaryTopicCreate(result);
227 }
228
229 return result;
230 }
231
232 @Override
233 public void unsubscribe(String s) throws JMSException {
234 getInternalSession().unsubscribe(s);
235 }
236
237 @Override
238 public TextMessage createTextMessage() throws JMSException {
239 return getInternalSession().createTextMessage();
240 }
241
242 @Override
243 public TextMessage createTextMessage(String s) throws JMSException {
244 return getInternalSession().createTextMessage(s);
245 }
246
247 @Override
248 public Topic createTopic(String s) throws JMSException {
249 return getInternalSession().createTopic(s);
250 }
251
252 @Override
253 public int getAcknowledgeMode() throws JMSException {
254 return getInternalSession().getAcknowledgeMode();
255 }
256
257 @Override
258 public boolean getTransacted() throws JMSException {
259 return getInternalSession().getTransacted();
260 }
261
262 @Override
263 public void recover() throws JMSException {
264 getInternalSession().recover();
265 }
266
267 @Override
268 public void rollback() throws JMSException {
269 getInternalSession().rollback();
270 }
271
272 @Override
273 public XAResource getXAResource() {
274 if (session == null) {
275 throw new IllegalStateException("Session is closed");
276 }
277 return session.getTransactionContext();
278 }
279
280 @Override
281 public Session getSession() {
282 return this;
283 }
284
285 @Override
286 public void run() {
287 if (session != null) {
288 session.run();
289 }
290 }
291
292 // Consumer related methods
293 // -------------------------------------------------------------------------
294 @Override
295 public QueueBrowser createBrowser(Queue queue) throws JMSException {
296 return addQueueBrowser(getInternalSession().createBrowser(queue));
297 }
298
299 @Override
300 public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
301 return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
302 }
303
304 @Override
305 public MessageConsumer createConsumer(Destination destination) throws JMSException {
306 return addConsumer(getInternalSession().createConsumer(destination));
307 }
308
309 @Override
310 public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
311 return addConsumer(getInternalSession().createConsumer(destination, selector));
312 }
313
314 @Override
315 public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
316 return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
317 }
318
319 @Override
320 public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
321 return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
322 }
323
324 @Override
325 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
326 return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
327 }
328
329 @Override
330 public MessageListener getMessageListener() throws JMSException {
331 return getInternalSession().getMessageListener();
332 }
333
334 @Override
335 public void setMessageListener(MessageListener messageListener) throws JMSException {
336 getInternalSession().setMessageListener(messageListener);
337 }
338
339 @Override
340 public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
341 return addTopicSubscriber(getInternalSession().createSubscriber(topic));
342 }
343
344 @Override
345 public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
346 return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local));
347 }
348
349 @Override
350 public QueueReceiver createReceiver(Queue queue) throws JMSException {
351 return addQueueReceiver(getInternalSession().createReceiver(queue));
352 }
353
354 @Override
355 public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
356 return addQueueReceiver(getInternalSession().createReceiver(queue, selector));
357 }
358
359 // Producer related methods
360 // -------------------------------------------------------------------------
361 @Override
362 public MessageProducer createProducer(Destination destination) throws JMSException {
363 return new PooledProducer(getMessageProducer(), destination);
364 }
365
366 @Override
367 public QueueSender createSender(Queue queue) throws JMSException {
368 return new PooledQueueSender(getQueueSender(), queue);
369 }
370
371 @Override
372 public TopicPublisher createPublisher(Topic topic) throws JMSException {
373 return new PooledTopicPublisher(getTopicPublisher(), topic);
374 }
375
376 /**
377 * Callback invoked when the consumer is closed.
378 * <p/>
379 * This is used to keep track of an explicit closed consumer created by this
380 * session, by which we know do not need to keep track of the consumer, as
381 * its already closed.
382 *
383 * @param consumer
384 * the consumer which is being closed
385 */
386 protected void onConsumerClose(MessageConsumer consumer) {
387 consumers.remove(consumer);
388 }
389
390 public ActiveMQSession getInternalSession() throws AlreadyClosedException {
391 if (session == null) {
392 throw new AlreadyClosedException("The session has already been closed");
393 }
394 return session;
395 }
396
397 public ActiveMQMessageProducer getMessageProducer() throws JMSException {
398 if (messageProducer == null) {
399 messageProducer = (ActiveMQMessageProducer) getInternalSession().createProducer(null);
400 }
401 return messageProducer;
402 }
403
404 public ActiveMQQueueSender getQueueSender() throws JMSException {
405 if (queueSender == null) {
406 queueSender = (ActiveMQQueueSender) getInternalSession().createSender(null);
407 }
408 return queueSender;
409 }
410
411 public ActiveMQTopicPublisher getTopicPublisher() throws JMSException {
412 if (topicPublisher == null) {
413 topicPublisher = (ActiveMQTopicPublisher) getInternalSession().createPublisher(null);
414 }
415 return topicPublisher;
416 }
417
418 private QueueBrowser addQueueBrowser(QueueBrowser browser) {
419 browsers.add(browser);
420 return browser;
421 }
422
423 private MessageConsumer addConsumer(MessageConsumer consumer) {
424 consumers.add(consumer);
425 // must wrap in PooledMessageConsumer to ensure the onConsumerClose method is
426 // invoked when the returned consumer is closed, to avoid memory leak in this
427 // session class in case many consumers is created
428 return new PooledMessageConsumer(this, consumer);
429 }
430
431 private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
432 consumers.add(subscriber);
433 return subscriber;
434 }
435
436 private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
437 consumers.add(receiver);
438 return receiver;
439 }
440
441 public void setIsXa(boolean isXa) {
442 this.isXa = isXa;
443 }
444
445 @Override
446 public String toString() {
447 return "PooledSession { " + session + " }";
448 }
449 }