001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.ra; 018 019import java.lang.reflect.Method; 020 021import javax.jms.JMSException; 022import javax.jms.Message; 023import javax.jms.MessageListener; 024import javax.jms.MessageProducer; 025import javax.jms.ServerSession; 026import javax.jms.Session; 027import javax.resource.spi.endpoint.MessageEndpoint; 028import javax.resource.spi.work.Work; 029import javax.resource.spi.work.WorkEvent; 030import javax.resource.spi.work.WorkException; 031import javax.resource.spi.work.WorkListener; 032import javax.resource.spi.work.WorkManager; 033 034import org.apache.activemq.ActiveMQSession; 035import org.apache.activemq.ActiveMQSession.DeliveryListener; 036import org.apache.activemq.TransactionContext; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * 042 */ 043public class ServerSessionImpl implements ServerSession, InboundContext, Work, DeliveryListener { 044 045 public static final Method ON_MESSAGE_METHOD; 046 private static int nextLogId; 047 048 static { 049 try { 050 ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[] { 051 Message.class 052 }); 053 } catch (Exception e) { 054 throw new ExceptionInInitializerError(e); 055 } 056 } 057 058 059 private int serverSessionId = getNextLogId(); 060 private final Logger log = LoggerFactory.getLogger(ServerSessionImpl.class.getName() + ":" + serverSessionId); 061 062 private ActiveMQSession session; 063 private WorkManager workManager; 064 private MessageEndpoint endpoint; 065 private MessageProducer messageProducer; 066 private final ServerSessionPoolImpl pool; 067 068 private Object runControlMutex = new Object(); 069 private boolean runningFlag; 070 /** 071 * True if an error was detected that cause this session to be stale. When a 072 * session is stale, it should not be used again for proccessing. 073 */ 074 private boolean stale; 075 /** 076 * Does the TX commit need to be managed by the RA? 077 */ 078 private final boolean useRAManagedTx; 079 /** 080 * The maximum number of messages to batch 081 */ 082 private final int batchSize; 083 /** 084 * The current number of messages in the batch 085 */ 086 private int currentBatchSize; 087 088 public ServerSessionImpl(ServerSessionPoolImpl pool, ActiveMQSession session, WorkManager workManager, MessageEndpoint endpoint, boolean useRAManagedTx, int batchSize) throws JMSException { 089 this.pool = pool; 090 this.session = session; 091 this.workManager = workManager; 092 this.endpoint = endpoint; 093 this.useRAManagedTx = useRAManagedTx; 094 this.session.setMessageListener((MessageListener)endpoint); 095 this.session.setDeliveryListener(this); 096 this.batchSize = batchSize; 097 } 098 099 private static synchronized int getNextLogId() { 100 return nextLogId++; 101 } 102 103 public Session getSession() throws JMSException { 104 return session; 105 } 106 107 protected boolean isStale() { 108 return stale || !session.isRunning(); 109 } 110 111 public MessageProducer getMessageProducer() throws JMSException { 112 if (messageProducer == null) { 113 messageProducer = getSession().createProducer(null); 114 } 115 return messageProducer; 116 } 117 118 /** 119 * @see javax.jms.ServerSession#start() 120 */ 121 public void start() throws JMSException { 122 123 synchronized (runControlMutex) { 124 if (runningFlag) { 125 log.debug("Start request ignored, already running."); 126 return; 127 } 128 runningFlag = true; 129 } 130 131 // We get here because we need to start a async worker. 132 log.debug("Starting run."); 133 try { 134 workManager.scheduleWork(this, WorkManager.INDEFINITE, null, new WorkListener() { 135 // The work listener is useful only for debugging... 136 public void workAccepted(WorkEvent event) { 137 log.debug("Work accepted: " + event); 138 } 139 140 public void workRejected(WorkEvent event) { 141 log.debug("Work rejected: " + event); 142 } 143 144 public void workStarted(WorkEvent event) { 145 log.debug("Work started: " + event); 146 } 147 148 public void workCompleted(WorkEvent event) { 149 log.debug("Work completed: " + event); 150 } 151 152 }); 153 } catch (WorkException e) { 154 throw (JMSException)new JMSException("Start failed: " + e).initCause(e); 155 } 156 } 157 158 /** 159 * @see java.lang.Runnable#run() 160 */ 161 public void run() { 162 log.debug("Running"); 163 currentBatchSize = 0; 164 while (true) { 165 log.debug("run loop start"); 166 try { 167 InboundContextSupport.register(this); 168 if ( session.isRunning() ) { 169 session.run(); 170 } else { 171 log.debug("JMS Session {} with unconsumed {} is no longer running (maybe due to loss of connection?), marking ServerSession as stale", session, session.getUnconsumedMessages().size()); 172 stale = true; 173 } 174 } catch (Throwable e) { 175 stale = true; 176 if ( log.isDebugEnabled() ) { 177 log.debug("Endpoint {} failed to process message.", session, e); 178 } else if ( log.isInfoEnabled() ) { 179 log.info("Endpoint {} failed to process message. Reason: " + e.getMessage(), session); 180 } 181 } finally { 182 InboundContextSupport.unregister(this); 183 log.debug("run loop end"); 184 synchronized (runControlMutex) { 185 // This endpoint may have gone stale due to error 186 if (stale) { 187 runningFlag = false; 188 pool.removeFromPool(this); 189 break; 190 } 191 if (!session.hasUncomsumedMessages()) { 192 runningFlag = false; 193 log.debug("Session has no unconsumed message, returning to pool"); 194 pool.returnToPool(this); 195 break; 196 } 197 } 198 } 199 } 200 log.debug("Run finished"); 201 } 202 203 /** 204 * The ActiveMQSession's run method will call back to this method before 205 * dispactching a message to the MessageListener. 206 */ 207 public void beforeDelivery(ActiveMQSession session, Message msg) { 208 if (currentBatchSize == 0) { 209 try { 210 endpoint.beforeDelivery(ON_MESSAGE_METHOD); 211 } catch (Throwable e) { 212 throw new RuntimeException("Endpoint before delivery notification failure", e); 213 } 214 } 215 } 216 217 /** 218 * The ActiveMQSession's run method will call back to this method after 219 * dispactching a message to the MessageListener. 220 */ 221 public void afterDelivery(ActiveMQSession session, Message msg) { 222 if (++currentBatchSize >= batchSize || !session.hasUncomsumedMessages()) { 223 currentBatchSize = 0; 224 try { 225 endpoint.afterDelivery(); 226 } catch (Throwable e) { 227 throw new RuntimeException("Endpoint after delivery notification failure: " + e, e); 228 } finally { 229 TransactionContext transactionContext = session.getTransactionContext(); 230 if (transactionContext != null && transactionContext.isInLocalTransaction()) { 231 if (!useRAManagedTx) { 232 // Sanitiy Check: If the local transaction has not been 233 // commited.. 234 // Commit it now. 235 log.warn("Local transaction had not been commited. Commiting now."); 236 } 237 try { 238 session.commit(); 239 } catch (JMSException e) { 240 log.info("Commit failed:", e); 241 } 242 } 243 } 244 } 245 } 246 247 /** 248 * @see javax.resource.spi.work.Work#release() 249 */ 250 public void release() { 251 log.debug("release called"); 252 } 253 254 /** 255 * @see java.lang.Object#toString() 256 */ 257 @Override 258 public String toString() { 259 return "ServerSessionImpl:" + serverSessionId + "{" + session +"}"; 260 } 261 262 public void close() { 263 try { 264 endpoint.release(); 265 } catch (Throwable e) { 266 log.debug("Endpoint did not release properly: " + e.getMessage(), e); 267 } 268 try { 269 session.close(); 270 } catch (Throwable e) { 271 log.debug("Session did not close properly: " + e.getMessage(), e); 272 } 273 } 274 275}