001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.ra; 018 019import java.lang.reflect.Method; 020 021import java.util.concurrent.atomic.AtomicBoolean; 022import javax.jms.Connection; 023import javax.jms.ConnectionConsumer; 024import javax.jms.ExceptionListener; 025import javax.jms.JMSException; 026import javax.jms.Message; 027import javax.jms.MessageListener; 028import javax.jms.Session; 029import javax.jms.Topic; 030import javax.naming.InitialContext; 031import javax.naming.NamingException; 032import javax.resource.ResourceException; 033import javax.resource.spi.endpoint.MessageEndpointFactory; 034import javax.resource.spi.work.Work; 035import javax.resource.spi.work.WorkException; 036import javax.resource.spi.work.WorkManager; 037 038import org.apache.activemq.ActiveMQConnection; 039import org.apache.activemq.ActiveMQConnectionConsumer; 040import org.apache.activemq.command.ActiveMQDestination; 041import org.apache.activemq.command.ActiveMQQueue; 042import org.apache.activemq.command.ActiveMQTopic; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * $Date$ 048 */ 049public class ActiveMQEndpointWorker { 050 051 public static final Method ON_MESSAGE_METHOD; 052 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQEndpointWorker.class); 053 054 private static final long INITIAL_RECONNECT_DELAY = 1000; // 1 second. 055 private static final long MAX_RECONNECT_DELAY = 1000 * 30; // 30 seconds. 056 private static final ThreadLocal<Session> THREAD_LOCAL = new ThreadLocal<Session>(); 057 058 static { 059 try { 060 ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[] { 061 Message.class 062 }); 063 } catch (Exception e) { 064 throw new ExceptionInInitializerError(e); 065 } 066 } 067 068 protected final ActiveMQEndpointActivationKey endpointActivationKey; 069 protected final MessageEndpointFactory endpointFactory; 070 protected final WorkManager workManager; 071 protected final boolean transacted; 072 073 private final ActiveMQDestination dest; 074 private final Work connectWork; 075 private final AtomicBoolean connecting = new AtomicBoolean(false); 076 private final Object shutdownMutex = new String("shutdownMutex"); 077 078 private ActiveMQConnection connection; 079 private ActiveMQConnectionConsumer consumer; 080 private ServerSessionPoolImpl serverSessionPool; 081 private boolean running; 082 083 protected ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException { 084 this.endpointActivationKey = key; 085 this.endpointFactory = endpointActivationKey.getMessageEndpointFactory(); 086 this.workManager = adapter.getBootstrapContext().getWorkManager(); 087 try { 088 this.transacted = endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD); 089 } catch (NoSuchMethodException e) { 090 throw new ResourceException("Endpoint does not implement the onMessage method."); 091 } 092 093 connectWork = new Work() { 094 long currentReconnectDelay = INITIAL_RECONNECT_DELAY; 095 096 public void release() { 097 // 098 } 099 100 public void run() { 101 currentReconnectDelay = INITIAL_RECONNECT_DELAY; 102 MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); 103 if (LOG.isInfoEnabled()) { 104 LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl() + "]"); 105 } 106 107 while (connecting.get() && running) { 108 try { 109 connection = adapter.makeConnection(activationSpec); 110 connection.setExceptionListener(new ExceptionListener() { 111 public void onException(JMSException error) { 112 if (!serverSessionPool.isClosing()) { 113 // initiate reconnection only once, i.e. on initial exception 114 // and only if not already trying to connect 115 LOG.error("Connection to broker failed: " + error.getMessage(), error); 116 if (connecting.compareAndSet(false, true)) { 117 synchronized (connectWork) { 118 disconnect(); 119 serverSessionPool.closeSessions(); 120 connect(); 121 } 122 } else { 123 // connection attempt has already been initiated 124 LOG.info("Connection attempt already in progress, ignoring connection exception"); 125 } 126 } 127 } 128 }); 129 connection.start(); 130 131 if (activationSpec.isDurableSubscription()) { 132 consumer = (ActiveMQConnectionConsumer) connection.createDurableConnectionConsumer( 133 (Topic) dest, 134 activationSpec.getSubscriptionName(), 135 emptyToNull(activationSpec.getMessageSelector()), 136 serverSessionPool, 137 connection.getPrefetchPolicy().getDurableTopicPrefetch(), 138 activationSpec.getNoLocalBooleanValue()); 139 } else { 140 consumer = (ActiveMQConnectionConsumer) connection.createConnectionConsumer( 141 dest, 142 emptyToNull(activationSpec.getMessageSelector()), 143 serverSessionPool, 144 getPrefetch(activationSpec, connection, dest), 145 activationSpec.getNoLocalBooleanValue()); 146 } 147 148 149 if (connecting.compareAndSet(true, false)) { 150 if (LOG.isInfoEnabled()) { 151 LOG.info("Successfully established connection to broker [" + adapter.getInfo().getServerUrl() + "]"); 152 } 153 } else { 154 LOG.error("Could not release connection lock"); 155 } 156 157 if (consumer.getConsumerInfo().getCurrentPrefetchSize() == 0) { 158 LOG.error("Endpoint " + endpointActivationKey.getActivationSpec() + " will not receive any messages due to broker 'zero prefetch' configuration for: " + dest); 159 } 160 161 } catch (JMSException error) { 162 if (LOG.isDebugEnabled()) { 163 LOG.debug("Failed to connect: " + error.getMessage(), error); 164 } 165 disconnect(); 166 pause(error); 167 } 168 } 169 } 170 171 private int getPrefetch(MessageActivationSpec activationSpec, ActiveMQConnection connection, ActiveMQDestination destination) { 172 if (destination.isTopic()) { 173 return connection.getPrefetchPolicy().getTopicPrefetch(); 174 } else if (destination.isQueue()) { 175 return connection.getPrefetchPolicy().getQueuePrefetch(); 176 } else { 177 return activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue(); 178 } 179 } 180 181 private void pause(JMSException error) { 182 if (currentReconnectDelay == MAX_RECONNECT_DELAY) { 183 LOG.error("Failed to connect to broker [" + adapter.getInfo().getServerUrl() + "]: " 184 + error.getMessage(), error); 185 LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds"); 186 } 187 try { 188 synchronized ( shutdownMutex ) { 189 // shutdownMutex will be notified by stop() method in 190 // order to accelerate shutdown of endpoint 191 shutdownMutex.wait(currentReconnectDelay); 192 } 193 } catch ( InterruptedException e ) { 194 Thread.interrupted(); 195 } 196 currentReconnectDelay *= 2; 197 if (currentReconnectDelay > MAX_RECONNECT_DELAY) { 198 currentReconnectDelay = MAX_RECONNECT_DELAY; 199 } 200 } 201 }; 202 203 MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec(); 204 if (activationSpec.isUseJndi()) { 205 try { 206 InitialContext initialContext = new InitialContext(); 207 dest = (ActiveMQDestination) initialContext.lookup(activationSpec.getDestination()); 208 } 209 catch (NamingException exc) { 210 throw new ResourceException("JNDI lookup failed for " 211 + activationSpec.getDestination()); 212 } 213 } 214 else { 215 if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) { 216 dest = new ActiveMQQueue(activationSpec.getDestination()); 217 } 218 else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) { 219 dest = new ActiveMQTopic(activationSpec.getDestination()); 220 } 221 else { 222 throw new ResourceException("Unknown destination type: " 223 + activationSpec.getDestinationType()); 224 } 225 } 226 } 227 228 /** 229 * @param c 230 */ 231 public static void safeClose(Connection c) { 232 try { 233 if (c != null) { 234 LOG.debug("Closing connection to broker"); 235 c.close(); 236 } 237 } catch (JMSException e) { 238 LOG.trace("failed to close c {}", c, e); 239 } 240 } 241 242 /** 243 * @param cc 244 */ 245 public static void safeClose(ConnectionConsumer cc) { 246 try { 247 if (cc != null) { 248 LOG.debug("Closing ConnectionConsumer"); 249 cc.close(); 250 } 251 } catch (JMSException e) { 252 LOG.trace("failed to close cc {}", cc, e); 253 } 254 } 255 256 /** 257 * 258 */ 259 public void start() throws ResourceException { 260 synchronized (connectWork) { 261 if (running) 262 return; 263 running = true; 264 265 if ( connecting.compareAndSet(false, true) ) { 266 LOG.info("Starting"); 267 serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue()); 268 connect(); 269 } else { 270 LOG.warn("Ignoring start command, EndpointWorker is already trying to connect"); 271 } 272 } 273 } 274 275 /** 276 * 277 */ 278 public void stop() throws InterruptedException { 279 synchronized (shutdownMutex) { 280 if (!running) 281 return; 282 running = false; 283 LOG.info("Stopping"); 284 // wake up pausing reconnect attempt 285 shutdownMutex.notifyAll(); 286 serverSessionPool.close(); 287 } 288 disconnect(); 289 } 290 291 private boolean isRunning() { 292 return running; 293 } 294 295 private void connect() { 296 synchronized ( connectWork ) { 297 if (!running) { 298 return; 299 } 300 301 try { 302 workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null); 303 } catch (WorkException e) { 304 running = false; 305 LOG.error("Work Manager did not accept work: ", e); 306 } 307 } 308 } 309 310 /** 311 * 312 */ 313 private void disconnect() { 314 synchronized ( connectWork ) { 315 safeClose(consumer); 316 consumer = null; 317 safeClose(connection); 318 connection = null; 319 } 320 } 321 322 protected void registerThreadSession(Session session) { 323 THREAD_LOCAL.set(session); 324 } 325 326 protected void unregisterThreadSession(Session session) { 327 THREAD_LOCAL.set(null); 328 } 329 330 // for testing 331 public void setConnection(ActiveMQConnection activeMQConnection) { 332 this.connection = activeMQConnection; 333 } 334 335 protected ActiveMQConnection getConnection() { 336 // make sure we only return a working connection 337 // in particular make sure that we do not return null 338 // after the resource adapter got disconnected from 339 // the broker via the disconnect() method 340 synchronized ( connectWork ) { 341 return connection; 342 } 343 } 344 345 private String emptyToNull(String value) { 346 if (value == null || value.length() == 0) { 347 return null; 348 } 349 return value; 350 } 351 352}