001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.ra;
018    
019    import java.lang.reflect.Method;
020    
021    import java.util.concurrent.atomic.AtomicBoolean;
022    import javax.jms.Connection;
023    import javax.jms.ConnectionConsumer;
024    import javax.jms.Destination;
025    import javax.jms.ExceptionListener;
026    import javax.jms.JMSException;
027    import javax.jms.Message;
028    import javax.jms.MessageListener;
029    import javax.jms.Session;
030    import javax.jms.Topic;
031    import javax.resource.ResourceException;
032    import javax.resource.spi.endpoint.MessageEndpointFactory;
033    import javax.resource.spi.work.Work;
034    import javax.resource.spi.work.WorkException;
035    import javax.resource.spi.work.WorkManager;
036    
037    import org.apache.activemq.ActiveMQConnection;
038    import org.apache.activemq.ActiveMQConnectionConsumer;
039    import org.apache.activemq.command.ActiveMQDestination;
040    import org.apache.activemq.command.ActiveMQQueue;
041    import org.apache.activemq.command.ActiveMQTopic;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    
045    /**
046     *  $Date$
047     */
048    public class ActiveMQEndpointWorker {
049    
050        public static final Method ON_MESSAGE_METHOD;
051        private static final Logger LOG = LoggerFactory.getLogger(ActiveMQEndpointWorker.class);
052    
053        private static final long INITIAL_RECONNECT_DELAY = 1000; // 1 second.
054        private static final long MAX_RECONNECT_DELAY = 1000 * 30; // 30 seconds.
055        private static final ThreadLocal<Session> THREAD_LOCAL = new ThreadLocal<Session>();
056    
057        static {
058            try {
059                ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[] {
060                    Message.class
061                });
062            } catch (Exception e) {
063                throw new ExceptionInInitializerError(e);
064            }
065        }
066    
067        protected final ActiveMQEndpointActivationKey endpointActivationKey;
068        protected final MessageEndpointFactory endpointFactory;
069        protected final WorkManager workManager;
070        protected final boolean transacted;
071    
072        private final ActiveMQDestination dest;
073        private final Work connectWork;
074        private final AtomicBoolean connecting = new AtomicBoolean(false);    
075        private final Object shutdownMutex = new String("shutdownMutex");
076        
077        private ActiveMQConnection connection;
078        private ActiveMQConnectionConsumer consumer;
079        private ServerSessionPoolImpl serverSessionPool;
080        private boolean running;
081    
082        protected ActiveMQEndpointWorker(final MessageResourceAdapter adapter, ActiveMQEndpointActivationKey key) throws ResourceException {
083            this.endpointActivationKey = key;
084            this.endpointFactory = endpointActivationKey.getMessageEndpointFactory();
085            this.workManager = adapter.getBootstrapContext().getWorkManager();
086            try {
087                this.transacted = endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD);
088            } catch (NoSuchMethodException e) {
089                throw new ResourceException("Endpoint does not implement the onMessage method.");
090            }
091    
092            connectWork = new Work() {
093                long currentReconnectDelay = INITIAL_RECONNECT_DELAY;
094    
095                public void release() {
096                    //
097                }
098    
099                public void run() {
100                    currentReconnectDelay = INITIAL_RECONNECT_DELAY;
101                    MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
102                    if (LOG.isInfoEnabled()) {
103                        LOG.info("Establishing connection to broker [" + adapter.getInfo().getServerUrl() + "]");
104                    }
105    
106                    while (connecting.get() && running) {
107                        try {
108                            connection = adapter.makeConnection(activationSpec);
109                            connection.setExceptionListener(new ExceptionListener() {
110                                public void onException(JMSException error) {
111                                    if (!serverSessionPool.isClosing()) {
112                                        // initiate reconnection only once, i.e. on initial exception
113                                        // and only if not already trying to connect
114                                        LOG.error("Connection to broker failed: " + error.getMessage(), error);
115                                        if (connecting.compareAndSet(false, true)) {
116                                            synchronized (connectWork) {
117                                                disconnect();
118                                                serverSessionPool.closeIdleSessions();
119                                                connect();
120                                            }
121                                        } else {
122                                            // connection attempt has already been initiated
123                                            LOG.info("Connection attempt already in progress, ignoring connection exception");
124                                        }
125                                    }
126                                }
127                            });
128                            connection.start();
129    
130                            if (activationSpec.isDurableSubscription()) {
131                                consumer = (ActiveMQConnectionConsumer) connection.createDurableConnectionConsumer(
132                                        (Topic) dest,
133                                        activationSpec.getSubscriptionName(),
134                                        emptyToNull(activationSpec.getMessageSelector()),
135                                        serverSessionPool,
136                                        connection.getPrefetchPolicy().getDurableTopicPrefetch(),
137                                        activationSpec.getNoLocalBooleanValue());
138                            } else {
139                                consumer = (ActiveMQConnectionConsumer) connection.createConnectionConsumer(
140                                        dest,
141                                        emptyToNull(activationSpec.getMessageSelector()),
142                                        serverSessionPool,
143                                        getPrefetch(activationSpec, connection, dest),
144                                        activationSpec.getNoLocalBooleanValue());
145                            }
146    
147    
148                            if (connecting.compareAndSet(true, false)) {
149                                if (LOG.isInfoEnabled()) {
150                                    LOG.info("Successfully established connection to broker [" + adapter.getInfo().getServerUrl() + "]");
151                                }
152                            } else {
153                                LOG.error("Could not release connection lock");
154                            }
155    
156                            if (consumer.getConsumerInfo().getCurrentPrefetchSize() == 0) {
157                                LOG.error("Endpoint " + endpointActivationKey.getActivationSpec() + " will not receive any messages due to broker 'zero prefetch' configuration for: " + dest);
158                            }
159    
160                        } catch (JMSException error) {
161                            if (LOG.isDebugEnabled()) {
162                                LOG.debug("Failed to connect: " + error.getMessage(), error);
163                            }
164                            disconnect();
165                            pause(error);
166                        }
167                    }
168                }
169    
170                private int getPrefetch(MessageActivationSpec activationSpec, ActiveMQConnection connection, ActiveMQDestination destination) {
171                    if (destination.isTopic()) {
172                        return connection.getPrefetchPolicy().getTopicPrefetch();
173                    } else if (destination.isQueue()) {
174                        return connection.getPrefetchPolicy().getQueuePrefetch();
175                    } else {
176                        return activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue();
177                    }
178                }
179                
180                private void pause(JMSException error) {
181                    if (currentReconnectDelay == MAX_RECONNECT_DELAY) {
182                        LOG.error("Failed to connect to broker [" + adapter.getInfo().getServerUrl() + "]: " 
183                                + error.getMessage(), error);
184                        LOG.error("Endpoint will try to reconnect to the JMS broker in " + (MAX_RECONNECT_DELAY / 1000) + " seconds");
185                    }
186                    try {
187                        synchronized ( shutdownMutex ) {
188                            // shutdownMutex will be notified by stop() method in
189                            // order to accelerate shutdown of endpoint
190                            shutdownMutex.wait(currentReconnectDelay);
191                        }
192                    } catch ( InterruptedException e ) {
193                        Thread.interrupted();
194                    }
195                    currentReconnectDelay *= 2;
196                    if (currentReconnectDelay > MAX_RECONNECT_DELAY) {
197                        currentReconnectDelay = MAX_RECONNECT_DELAY;
198                    }                
199                }
200            };
201    
202            MessageActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
203            if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
204                dest = new ActiveMQQueue(activationSpec.getDestination());
205            } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
206                dest = new ActiveMQTopic(activationSpec.getDestination());
207            } else {
208                throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
209            }
210    
211        }
212    
213        /**
214         * @param c
215         */
216        public static void safeClose(Connection c) {
217            try {
218                if (c != null) {
219                    LOG.debug("Closing connection to broker");
220                    c.close();
221                }
222            } catch (JMSException e) {
223                //
224            }
225        }
226    
227        /**
228         * @param cc
229         */
230        public static void safeClose(ConnectionConsumer cc) {
231            try {
232                if (cc != null) {
233                    LOG.debug("Closing ConnectionConsumer");
234                    cc.close();
235                }
236            } catch (JMSException e) {
237                //
238            }
239        }
240    
241        /**
242         * 
243         */
244        public void start() throws ResourceException {
245            synchronized (connectWork) {
246                if (running)
247                return;
248            running = true;
249    
250                if ( connecting.compareAndSet(false, true) ) {
251                    LOG.info("Starting");
252            serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
253            connect();
254                } else {
255                    LOG.warn("Ignoring start command, EndpointWorker is already trying to connect");
256        }
257            }
258        }
259    
260        /**
261         * 
262         */
263        public void stop() throws InterruptedException {
264            synchronized (shutdownMutex) {
265                if (!running)
266                    return;
267                running = false;
268                LOG.info("Stopping");
269                // wake up pausing reconnect attempt
270                shutdownMutex.notifyAll();
271                serverSessionPool.close();
272            }
273            disconnect();
274        }
275    
276        private boolean isRunning() {
277            return running;
278        }
279    
280        private void connect() {
281            synchronized ( connectWork ) {
282            if (!running) {
283                return;
284            }
285    
286            try {
287                workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null);
288            } catch (WorkException e) {
289                running = false;
290                LOG.error("Work Manager did not accept work: ", e);
291            }
292        }
293        }
294    
295        /**
296         * 
297         */
298        private void disconnect() {
299            synchronized ( connectWork ) {
300            safeClose(consumer);
301            consumer = null;
302            safeClose(connection);
303            connection = null;
304        }
305                }
306    
307        protected void registerThreadSession(Session session) {
308            THREAD_LOCAL.set(session);
309        }
310    
311        protected void unregisterThreadSession(Session session) {
312            THREAD_LOCAL.set(null);
313        }
314    
315        protected ActiveMQConnection getConnection() {
316            // make sure we only return a working connection
317            // in particular make sure that we do not return null
318            // after the resource adapter got disconnected from
319            // the broker via the disconnect() method
320            synchronized ( connectWork ) {
321                return connection;
322            }
323        }
324    
325        private String emptyToNull(String value) {
326            if (value == null || value.length() == 0) {
327                return null;
328            }
329            return value;
330        }
331    
332    }