001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import java.util.Iterator;
020    import java.util.List;
021    import java.util.Map;
022    import java.util.concurrent.CopyOnWriteArrayList;
023    import java.util.concurrent.LinkedBlockingQueue;
024    import java.util.concurrent.ThreadFactory;
025    import java.util.concurrent.ThreadPoolExecutor;
026    import java.util.concurrent.TimeUnit;
027    import java.util.concurrent.atomic.AtomicBoolean;
028    import java.util.concurrent.atomic.AtomicReference;
029    
030    import javax.jms.Connection;
031    import javax.jms.Destination;
032    import javax.jms.QueueConnection;
033    
034    import org.apache.activemq.ActiveMQConnectionFactory;
035    import org.apache.activemq.Service;
036    import org.apache.activemq.broker.BrokerService;
037    import org.apache.activemq.util.LRUCache;
038    import org.apache.activemq.util.ThreadPoolUtils;
039    import org.slf4j.Logger;
040    import org.slf4j.LoggerFactory;
041    import org.springframework.jndi.JndiTemplate;
042    
043    /**
044     * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
045     * JMS providers are still only in compliance with JMS v1.0.1 , this bridge itself
046     * aimed to be in compliance with the JMS 1.0.2 specification.
047     */
048    public abstract class JmsConnector implements Service {
049    
050        private static int nextId;
051        private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class);
052    
053        protected boolean preferJndiDestinationLookup = false;
054        protected JndiTemplate jndiLocalTemplate;
055        protected JndiTemplate jndiOutboundTemplate;
056        protected JmsMesageConvertor inboundMessageConvertor;
057        protected JmsMesageConvertor outboundMessageConvertor;
058        protected AtomicBoolean initialized = new AtomicBoolean(false);
059        protected AtomicBoolean localSideInitialized = new AtomicBoolean(false);
060        protected AtomicBoolean foreignSideInitialized = new AtomicBoolean(false);
061        protected AtomicBoolean started = new AtomicBoolean(false);
062        protected AtomicBoolean failed = new AtomicBoolean();
063        protected AtomicReference<Connection> foreignConnection = new AtomicReference<Connection>();
064        protected AtomicReference<Connection> localConnection = new AtomicReference<Connection>();
065        protected ActiveMQConnectionFactory embeddedConnectionFactory;
066        protected int replyToDestinationCacheSize = 10000;
067        protected String outboundUsername;
068        protected String outboundPassword;
069        protected String localUsername;
070        protected String localPassword;
071        protected String outboundClientId;
072        protected String localClientId;
073        protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache();
074    
075        private ReconnectionPolicy policy = new ReconnectionPolicy();
076        protected ThreadPoolExecutor connectionSerivce;
077        private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
078        private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
079        private String name;
080    
081        private static LRUCache<Destination, DestinationBridge> createLRUCache() {
082            return new LRUCache<Destination, DestinationBridge>() {
083                private static final long serialVersionUID = -7446792754185879286L;
084    
085                protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) {
086                    if (size() > maxCacheSize) {
087                        Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator();
088                        Map.Entry<Destination, DestinationBridge> lru = iter.next();
089                        remove(lru.getKey());
090                        DestinationBridge bridge = (DestinationBridge)lru.getValue();
091                        try {
092                            bridge.stop();
093                            LOG.info("Expired bridge: " + bridge);
094                        } catch (Exception e) {
095                            LOG.warn("stopping expired bridge" + bridge + " caused an exception", e);
096                        }
097                    }
098                    return false;
099                }
100            };
101        }
102    
103        public boolean init() {
104            boolean result = initialized.compareAndSet(false, true);
105            if (result) {
106                if (jndiLocalTemplate == null) {
107                    jndiLocalTemplate = new JndiTemplate();
108                }
109                if (jndiOutboundTemplate == null) {
110                    jndiOutboundTemplate = new JndiTemplate();
111                }
112                if (inboundMessageConvertor == null) {
113                    inboundMessageConvertor = new SimpleJmsMessageConvertor();
114                }
115                if (outboundMessageConvertor == null) {
116                    outboundMessageConvertor = new SimpleJmsMessageConvertor();
117                }
118                replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
119    
120                connectionSerivce = createExecutor();
121    
122                // Subclasses can override this to customize their own it.
123                result = doConnectorInit();
124            }
125            return result;
126        }
127    
128        protected boolean doConnectorInit() {
129    
130            // We try to make a connection via a sync call first so that the
131            // JmsConnector is fully initialized before the start call returns
132            // in order to avoid missing any messages that are dispatched
133            // immediately after startup.  If either side fails we queue an
134            // asynchronous task to manage the reconnect attempts.
135    
136            try {
137                initializeLocalConnection();
138                localSideInitialized.set(true);
139            } catch(Exception e) {
140                // Queue up the task to attempt the local connection.
141                scheduleAsyncLocalConnectionReconnect();
142            }
143    
144            try {
145                initializeForeignConnection();
146                foreignSideInitialized.set(true);
147            } catch(Exception e) {
148                // Queue up the task for the foreign connection now.
149                scheduleAsyncForeignConnectionReconnect();
150            }
151    
152            return true;
153        }
154    
155        public void start() throws Exception {
156            if (started.compareAndSet(false, true)) {
157                init();
158                for (DestinationBridge bridge : inboundBridges) {
159                    bridge.start();
160                }
161                for (DestinationBridge bridge : outboundBridges) {
162                    bridge.start();
163                }
164                LOG.info("JMS Connector " + getName() + " Started");
165            }
166        }
167    
168        public void stop() throws Exception {
169            if (started.compareAndSet(true, false)) {
170    
171                ThreadPoolUtils.shutdown(connectionSerivce);
172                connectionSerivce = null;
173    
174                for (DestinationBridge bridge : inboundBridges) {
175                    bridge.stop();
176                }
177                for (DestinationBridge bridge : outboundBridges) {
178                    bridge.stop();
179                }
180                LOG.info("JMS Connector " + getName() + " Stopped");
181            }
182        }
183    
184        public void clearBridges() {
185            inboundBridges.clear();
186            outboundBridges.clear();
187            replyToBridges.clear();
188        }
189    
190        protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
191    
192        /**
193         * One way to configure the local connection - this is called by The
194         * BrokerService when the Connector is embedded
195         *
196         * @param service
197         */
198        public void setBrokerService(BrokerService service) {
199            embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
200        }
201    
202        public Connection getLocalConnection() {
203            return this.localConnection.get();
204        }
205    
206        public Connection getForeignConnection() {
207            return this.foreignConnection.get();
208        }
209    
210        /**
211         * @return Returns the jndiTemplate.
212         */
213        public JndiTemplate getJndiLocalTemplate() {
214            return jndiLocalTemplate;
215        }
216    
217        /**
218         * @param jndiTemplate The jndiTemplate to set.
219         */
220        public void setJndiLocalTemplate(JndiTemplate jndiTemplate) {
221            this.jndiLocalTemplate = jndiTemplate;
222        }
223    
224        /**
225         * @return Returns the jndiOutboundTemplate.
226         */
227        public JndiTemplate getJndiOutboundTemplate() {
228            return jndiOutboundTemplate;
229        }
230    
231        /**
232         * @param jndiOutboundTemplate The jndiOutboundTemplate to set.
233         */
234        public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) {
235            this.jndiOutboundTemplate = jndiOutboundTemplate;
236        }
237    
238        /**
239         * @return Returns the inboundMessageConvertor.
240         */
241        public JmsMesageConvertor getInboundMessageConvertor() {
242            return inboundMessageConvertor;
243        }
244    
245        /**
246         * @param inboundMessageConvertor The inboundMessageConvertor to set.
247         */
248        public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
249            this.inboundMessageConvertor = jmsMessageConvertor;
250        }
251    
252        /**
253         * @return Returns the outboundMessageConvertor.
254         */
255        public JmsMesageConvertor getOutboundMessageConvertor() {
256            return outboundMessageConvertor;
257        }
258    
259        /**
260         * @param outboundMessageConvertor The outboundMessageConvertor to set.
261         */
262        public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) {
263            this.outboundMessageConvertor = outboundMessageConvertor;
264        }
265    
266        /**
267         * @return Returns the replyToDestinationCacheSize.
268         */
269        public int getReplyToDestinationCacheSize() {
270            return replyToDestinationCacheSize;
271        }
272    
273        /**
274         * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set.
275         */
276        public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
277            this.replyToDestinationCacheSize = replyToDestinationCacheSize;
278        }
279    
280        /**
281         * @return Returns the localPassword.
282         */
283        public String getLocalPassword() {
284            return localPassword;
285        }
286    
287        /**
288         * @param localPassword The localPassword to set.
289         */
290        public void setLocalPassword(String localPassword) {
291            this.localPassword = localPassword;
292        }
293    
294        /**
295         * @return Returns the localUsername.
296         */
297        public String getLocalUsername() {
298            return localUsername;
299        }
300    
301        /**
302         * @param localUsername The localUsername to set.
303         */
304        public void setLocalUsername(String localUsername) {
305            this.localUsername = localUsername;
306        }
307    
308        /**
309         * @return Returns the outboundPassword.
310         */
311        public String getOutboundPassword() {
312            return outboundPassword;
313        }
314    
315        /**
316         * @param outboundPassword The outboundPassword to set.
317         */
318        public void setOutboundPassword(String outboundPassword) {
319            this.outboundPassword = outboundPassword;
320        }
321    
322        /**
323         * @return Returns the outboundUsername.
324         */
325        public String getOutboundUsername() {
326            return outboundUsername;
327        }
328    
329        /**
330         * @param outboundUsername The outboundUsername to set.
331         */
332        public void setOutboundUsername(String outboundUsername) {
333            this.outboundUsername = outboundUsername;
334        }
335    
336        /**
337         * @return the outboundClientId
338         */
339        public String getOutboundClientId() {
340            return outboundClientId;
341        }
342    
343        /**
344         * @param outboundClientId the outboundClientId to set
345         */
346        public void setOutboundClientId(String outboundClientId) {
347            this.outboundClientId = outboundClientId;
348        }
349    
350        /**
351         * @return the localClientId
352         */
353        public String getLocalClientId() {
354            return localClientId;
355        }
356    
357        /**
358         * @param localClientId the localClientId to set
359         */
360        public void setLocalClientId(String localClientId) {
361            this.localClientId = localClientId;
362        }
363    
364        /**
365         * @return the currently configured reconnection policy.
366         */
367        public ReconnectionPolicy getReconnectionPolicy() {
368            return this.policy;
369        }
370    
371        /**
372         * @param policy The new reconnection policy this {@link JmsConnector} should use.
373         */
374        public void setReconnectionPolicy(ReconnectionPolicy policy) {
375            this.policy = policy;
376        }
377    
378        /**
379         * @return the preferJndiDestinationLookup
380         */
381        public boolean isPreferJndiDestinationLookup() {
382            return preferJndiDestinationLookup;
383        }
384    
385        /**
386         * Sets whether the connector should prefer to first try to find a destination in JNDI before
387         * using JMS semantics to create a Destination.  By default the connector will first use JMS
388         * semantics and then fall-back to JNDI lookup, setting this value to true will reverse that
389         * ordering.
390         *
391         * @param preferJndiDestinationLookup the preferJndiDestinationLookup to set
392         */
393        public void setPreferJndiDestinationLookup(boolean preferJndiDestinationLookup) {
394            this.preferJndiDestinationLookup = preferJndiDestinationLookup;
395        }
396    
397        /**
398         * @return returns true if the {@link JmsConnector} is connected to both brokers.
399         */
400        public boolean isConnected() {
401            return localConnection.get() != null && foreignConnection.get() != null;
402        }
403    
404        protected void addInboundBridge(DestinationBridge bridge) {
405            if (!inboundBridges.contains(bridge)) {
406                inboundBridges.add(bridge);
407            }
408        }
409    
410        protected void addOutboundBridge(DestinationBridge bridge) {
411            if (!outboundBridges.contains(bridge)) {
412                outboundBridges.add(bridge);
413            }
414        }
415    
416        protected void removeInboundBridge(DestinationBridge bridge) {
417            inboundBridges.remove(bridge);
418        }
419    
420        protected void removeOutboundBridge(DestinationBridge bridge) {
421            outboundBridges.remove(bridge);
422        }
423    
424        public String getName() {
425            if (name == null) {
426                name = "Connector:" + getNextId();
427            }
428            return name;
429        }
430    
431        public void setName(String name) {
432            this.name = name;
433        }
434    
435        private static synchronized int getNextId() {
436            return nextId++;
437        }
438    
439        public boolean isFailed() {
440            return this.failed.get();
441        }
442    
443        /**
444         * Performs the work of connection to the local side of the Connection.
445         * <p>
446         * This creates the initial connection to the local end of the {@link JmsConnector}
447         * and then sets up all the destination bridges with the information needed to bridge
448         * on the local side of the connection.
449         *
450         * @throws Exception if the connection cannot be established for any reason.
451         */
452        protected abstract void initializeLocalConnection() throws Exception;
453    
454        /**
455         * Performs the work of connection to the foreign side of the Connection.
456         * <p>
457         * This creates the initial connection to the foreign end of the {@link JmsConnector}
458         * and then sets up all the destination bridges with the information needed to bridge
459         * on the foreign side of the connection.
460         *
461         * @throws Exception if the connection cannot be established for any reason.
462         */
463        protected abstract void initializeForeignConnection() throws Exception;
464    
465        /**
466         * Callback method that the Destination bridges can use to report an exception to occurs
467         * during normal bridging operations.
468         *
469         * @param connection
470         *          The connection that was in use when the failure occured.
471         */
472        void handleConnectionFailure(Connection connection) {
473    
474            // Can happen if async exception listener kicks in at the same time.
475            if (connection == null || !this.started.get()) {
476                return;
477            }
478    
479            LOG.info("JmsConnector handling loss of connection [" + connection.toString() + "]");
480    
481            // TODO - How do we handle the re-wiring of replyToBridges in this case.
482            replyToBridges.clear();
483    
484            if (this.foreignConnection.compareAndSet((QueueConnection)connection, null)) {
485    
486                // Stop the inbound bridges when the foreign connection is dropped since
487                // the bridge has no consumer and needs to be restarted once a new connection
488                // to the foreign side is made.
489                for (DestinationBridge bridge : inboundBridges) {
490                    try {
491                        bridge.stop();
492                    } catch(Exception e) {
493                    }
494                }
495    
496                // We got here first and cleared the connection, now we queue a reconnect.
497                this.connectionSerivce.execute(new Runnable() {
498    
499                    @Override
500                    public void run() {
501                        try {
502                            doInitializeConnection(false);
503                        } catch (Exception e) {
504                            LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
505                        }
506                    }
507                });
508    
509            } else if (this.localConnection.compareAndSet((QueueConnection)connection, null)) {
510    
511                // Stop the outbound bridges when the local connection is dropped since
512                // the bridge has no consumer and needs to be restarted once a new connection
513                // to the local side is made.
514                for (DestinationBridge bridge : outboundBridges) {
515                    try {
516                        bridge.stop();
517                    } catch(Exception e) {
518                    }
519                }
520    
521                // We got here first and cleared the connection, now we queue a reconnect.
522                this.connectionSerivce.execute(new Runnable() {
523    
524                    @Override
525                    public void run() {
526                        try {
527                            doInitializeConnection(true);
528                        } catch (Exception e) {
529                            LOG.error("Failed to initialize local connection for the JMSConnector", e);
530                        }
531                    }
532                });
533            }
534        }
535    
536        private void scheduleAsyncLocalConnectionReconnect() {
537            this.connectionSerivce.execute(new Runnable() {
538                @Override
539                public void run() {
540                    try {
541                        doInitializeConnection(true);
542                    } catch (Exception e) {
543                        LOG.error("Failed to initialize local connection for the JMSConnector", e);
544                    }
545                }
546            });
547        }
548    
549        private void scheduleAsyncForeignConnectionReconnect() {
550            this.connectionSerivce.execute(new Runnable() {
551                @Override
552                public void run() {
553                    try {
554                        doInitializeConnection(false);
555                    } catch (Exception e) {
556                        LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
557                    }
558                }
559            });
560        }
561    
562        private void doInitializeConnection(boolean local) throws Exception {
563    
564            int attempt = 0;
565    
566            final int maxRetries;
567            if (local) {
568                maxRetries = !localSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
569                                                           policy.getMaxReconnectAttempts();
570            } else {
571                maxRetries = !foreignSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
572                                                             policy.getMaxReconnectAttempts();
573            }
574    
575            do
576            {
577                if (attempt > 0) {
578                    try {
579                        Thread.sleep(policy.getNextDelay(attempt));
580                    } catch(InterruptedException e) {
581                    }
582                }
583    
584                if (connectionSerivce.isTerminating()) {
585                    return;
586                }
587    
588                try {
589    
590                    if (local) {
591                        initializeLocalConnection();
592                        localSideInitialized.set(true);
593                    } else {
594                        initializeForeignConnection();
595                        foreignSideInitialized.set(true);
596                    }
597    
598                    // Once we are connected we ensure all the bridges are started.
599                    if (localConnection.get() != null && foreignConnection.get() != null) {
600                        for (DestinationBridge bridge : inboundBridges) {
601                            bridge.start();
602                        }
603                        for (DestinationBridge bridge : outboundBridges) {
604                            bridge.start();
605                        }
606                    }
607    
608                    return;
609                } catch(Exception e) {
610                    LOG.debug("Failed to establish initial " + (local ? "local" : "foriegn") +
611                              " connection for JmsConnector [" + attempt + "]: " + e.getMessage());
612                }
613            }
614            while (maxRetries < ++attempt && !connectionSerivce.isTerminating());
615    
616            this.failed.set(true);
617        }
618    
619        private ThreadFactory factory = new ThreadFactory() {
620            public Thread newThread(Runnable runnable) {
621                Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: ");
622                thread.setDaemon(true);
623                return thread;
624            }
625        };
626    
627        private ThreadPoolExecutor createExecutor() {
628            ThreadPoolExecutor exec = new ThreadPoolExecutor(0, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
629            exec.allowCoreThreadTimeOut(true);
630            return exec;
631        }
632    }