001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.ra;
018
019import java.lang.reflect.Method;
020
021import java.util.concurrent.atomic.AtomicBoolean;
022import javax.jms.Connection;
023import javax.jms.ConnectionConsumer;
024import javax.jms.ExceptionListener;
025import javax.jms.JMSException;
026import javax.jms.Message;
027import javax.jms.MessageListener;
028import javax.jms.Session;
029import javax.jms.Topic;
030import javax.naming.InitialContext;
031import javax.naming.NamingException;
032import javax.resource.ResourceException;
033import javax.resource.spi.endpoint.MessageEndpointFactory;
034import javax.resource.spi.work.Work;
035import javax.resource.spi.work.WorkException;
036import javax.resource.spi.work.WorkManager;
037
038import org.apache.activemq.ActiveMQConnection;
039import org.apache.activemq.ActiveMQConnectionConsumer;
040import org.apache.activemq.command.ActiveMQDestination;
041import org.apache.activemq.command.ActiveMQQueue;
042import org.apache.activemq.command.ActiveMQTopic;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 *  $Date$
048 */
049public class ActiveMQEndpointWorker {
050
051    public static final Method ON_MESSAGE_METHOD;
052    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQEndpointWorker.class);
053
054    private static final long INITIAL_RECONNECT_DELAY = 1000; // 1 second.
055    private static final long MAX_RECONNECT_DELAY = 1000 * 30; // 30 seconds.
056    private static final ThreadLocal<Session> THREAD_LOCAL = new ThreadLocal<Session>();
057
058    static {
059        try {
060            ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[] {
061                Message.class
062            });
063        } catch (Exception e) {
064            throw new ExceptionInInitializerError(e);
065        }
066    }
067
068    protected final ActiveMQEndpointActivationKey endpointActivationKey;
069    protected final MessageEndpointFactory endpointFactory;
070    protected final WorkManager workManager;
071    protected final boolean transacted;
072
073    private final ActiveMQDestination dest;
074    private final Work connectWork;
075    private final AtomicBoolean connecting = new AtomicBoolean(false);    
076    private final Object shutdownMutex = new String("shutdownMutex");
077    
078    private ActiveMQConnection connection;
079    private ActiveMQConnectionConsumer consumer;
080    private ServerSessionPoolImpl serverSessionPool;
081    private boolean running;
082
083    protected ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
084        this.endpointActivationKey = key;
085        this.endpointFactory = endpointActivationKey.getMessageEndpointFactory();
086        this.workManager = adapter.getBootstrapContext().getWorkManager();
087        try {
088            this.transacted = endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD);
089        } catch (NoSuchMethodException e) {
090            throw new ResourceException("Endpoint does not implement the onMessage method.");
091        }
092
093        connectWork = new Work() {
094            long currentReconnectDelay = INITIAL_RECONNECT_DELAY;
095
096            public void release() {
097                //
098            }
099
100            public void run() {
101                currentReconnectDelay = INITIAL_RECONNECT_DELAY;
102                MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
103                if (LOG.isInfoEnabled()) {
104                    LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl() + "]");
105                }
106
107                while (connecting.get() && running) {
108                    try {
109                        connection = adapter.makeConnection(activationSpec);
110                        connection.setExceptionListener(new ExceptionListener() {
111                            public void onException(JMSException error) {
112                                if (!serverSessionPool.isClosing()) {
113                                    // initiate reconnection only once, i.e. on initial exception
114                                    // and only if not already trying to connect
115                                    LOG.error("Connection to broker failed: " + error.getMessage(), error);
116                                    if (connecting.compareAndSet(false, true)) {
117                                        synchronized (connectWork) {
118                                            disconnect();
119                                            serverSessionPool.closeSessions();
120                                            connect();
121                                        }
122                                    } else {
123                                        // connection attempt has already been initiated
124                                        LOG.info("Connection attempt already in progress, ignoring connection exception");
125                                    }
126                                }
127                            }
128                        });
129                        connection.start();
130
131                        if (activationSpec.isDurableSubscription()) {
132                            consumer = (ActiveMQConnectionConsumer) connection.createDurableConnectionConsumer(
133                                    (Topic) dest,
134                                    activationSpec.getSubscriptionName(),
135                                    emptyToNull(activationSpec.getMessageSelector()),
136                                    serverSessionPool,
137                                    connection.getPrefetchPolicy().getDurableTopicPrefetch(),
138                                    activationSpec.getNoLocalBooleanValue());
139                        } else {
140                            consumer = (ActiveMQConnectionConsumer) connection.createConnectionConsumer(
141                                    dest,
142                                    emptyToNull(activationSpec.getMessageSelector()),
143                                    serverSessionPool,
144                                    getPrefetch(activationSpec, connection, dest),
145                                    activationSpec.getNoLocalBooleanValue());
146                        }
147
148
149                        if (connecting.compareAndSet(true, false)) {
150                            if (LOG.isInfoEnabled()) {
151                                LOG.info("Successfully established connection to broker [" + adapter.getInfo().getServerUrl() + "]");
152                            }
153                        } else {
154                            LOG.error("Could not release connection lock");
155                        }
156
157                        if (consumer.getConsumerInfo().getCurrentPrefetchSize() == 0) {
158                            LOG.error("Endpoint " + endpointActivationKey.getActivationSpec() + " will not receive any messages due to broker 'zero prefetch' configuration for: " + dest);
159                        }
160
161                    } catch (JMSException error) {
162                        if (LOG.isDebugEnabled()) {
163                            LOG.debug("Failed to connect: " + error.getMessage(), error);
164                        }
165                        disconnect();
166                        pause(error);
167                    }
168                }
169            }
170
171            private int getPrefetch(MessageActivationSpec activationSpec, ActiveMQConnection connection, ActiveMQDestination destination) {
172                if (destination.isTopic()) {
173                    return connection.getPrefetchPolicy().getTopicPrefetch();
174                } else if (destination.isQueue()) {
175                    return connection.getPrefetchPolicy().getQueuePrefetch();
176                } else {
177                    return activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue();
178                }
179            }
180            
181            private void pause(JMSException error) {
182                if (currentReconnectDelay == MAX_RECONNECT_DELAY) {
183                    LOG.error("Failed to connect to broker [" + adapter.getInfo().getServerUrl() + "]: " 
184                            + error.getMessage(), error);
185                    LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds");
186                }
187                try {
188                    synchronized ( shutdownMutex ) {
189                        // shutdownMutex will be notified by stop() method in
190                        // order to accelerate shutdown of endpoint
191                        shutdownMutex.wait(currentReconnectDelay);
192                    }
193                } catch ( InterruptedException e ) {
194                    Thread.interrupted();
195                }
196                currentReconnectDelay *= 2;
197                if (currentReconnectDelay > MAX_RECONNECT_DELAY) {
198                    currentReconnectDelay = MAX_RECONNECT_DELAY;
199                }                
200            }
201        };
202
203        MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
204        if (activationSpec.isUseJndi()) {
205            try {
206                InitialContext initialContext = new InitialContext();
207                dest = (ActiveMQDestination) initialContext.lookup(activationSpec.getDestination());
208            }
209            catch (NamingException exc) {
210                throw new ResourceException("JNDI lookup failed for "
211                    + activationSpec.getDestination());
212            }
213        }
214        else {
215            if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
216                dest = new ActiveMQQueue(activationSpec.getDestination());
217            }
218            else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
219                dest = new ActiveMQTopic(activationSpec.getDestination());
220            }
221            else {
222                throw new ResourceException("Unknown destination type: "
223                    + activationSpec.getDestinationType());
224            }
225        }
226    }
227
228    /**
229     * @param c
230     */
231    public static void safeClose(Connection c) {
232        try {
233            if (c != null) {
234                LOG.debug("Closing connection to broker");
235                c.close();
236            }
237        } catch (JMSException e) {
238            LOG.trace("failed to close c {}", c, e);
239        }
240    }
241
242    /**
243     * @param cc
244     */
245    public static void safeClose(ConnectionConsumer cc) {
246        try {
247            if (cc != null) {
248                LOG.debug("Closing ConnectionConsumer");
249                cc.close();
250            }
251        } catch (JMSException e) {
252            LOG.trace("failed to close cc {}", cc, e);
253        }
254    }
255
256    /**
257     * 
258     */
259    public void start() throws ResourceException {
260        synchronized (connectWork) {
261            if (running)
262            return;
263        running = true;
264
265            if ( connecting.compareAndSet(false, true) ) {
266                LOG.info("Starting");
267        serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
268        connect();
269            } else {
270                LOG.warn("Ignoring start command, EndpointWorker is already trying to connect");
271    }
272        }
273    }
274
275    /**
276     * 
277     */
278    public void stop() throws InterruptedException {
279        synchronized (shutdownMutex) {
280            if (!running)
281                return;
282            running = false;
283            LOG.info("Stopping");
284            // wake up pausing reconnect attempt
285            shutdownMutex.notifyAll();
286            serverSessionPool.close();
287        }
288        disconnect();
289    }
290
291    private boolean isRunning() {
292        return running;
293    }
294
295    private void connect() {
296        synchronized ( connectWork ) {
297            if (!running) {
298                return;
299            }
300
301            try {
302                workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null);
303            } catch (WorkException e) {
304                running = false;
305                LOG.error("Work Manager did not accept work: ", e);
306            }
307        }
308    }
309
310    /**
311     * 
312     */
313    private void disconnect() {
314        synchronized ( connectWork ) {
315        safeClose(consumer);
316        consumer = null;
317        safeClose(connection);
318        connection = null;
319    }
320            }
321
322    protected void registerThreadSession(Session session) {
323        THREAD_LOCAL.set(session);
324    }
325
326    protected void unregisterThreadSession(Session session) {
327        THREAD_LOCAL.set(null);
328    }
329
330    // for testing
331    public void setConnection(ActiveMQConnection activeMQConnection) {
332        this.connection = activeMQConnection;
333    }
334
335    protected ActiveMQConnection getConnection() {
336        // make sure we only return a working connection
337        // in particular make sure that we do not return null
338        // after the resource adapter got disconnected from
339        // the broker via the disconnect() method
340        synchronized ( connectWork ) {
341            return connection;
342        }
343    }
344
345    private String emptyToNull(String value) {
346        if (value == null || value.length() == 0) {
347            return null;
348        }
349        return value;
350    }
351
352}