001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network.jms;
018    
019    import java.util.Iterator;
020    import java.util.List;
021    import java.util.Map;
022    import java.util.concurrent.CopyOnWriteArrayList;
023    import java.util.concurrent.LinkedBlockingQueue;
024    import java.util.concurrent.ThreadFactory;
025    import java.util.concurrent.ThreadPoolExecutor;
026    import java.util.concurrent.TimeUnit;
027    import java.util.concurrent.atomic.AtomicBoolean;
028    import java.util.concurrent.atomic.AtomicReference;
029    
030    import javax.jms.Connection;
031    import javax.jms.Destination;
032    import javax.jms.QueueConnection;
033    
034    import org.apache.activemq.ActiveMQConnectionFactory;
035    import org.apache.activemq.Service;
036    import org.apache.activemq.broker.BrokerService;
037    import org.apache.activemq.util.LRUCache;
038    import org.apache.activemq.util.ThreadPoolUtils;
039    import org.slf4j.Logger;
040    import org.slf4j.LoggerFactory;
041    
042    /**
043     * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
044     * JMS providers are still only in compliance with JMS v1.0.1 , this bridge itself
045     * aimed to be in compliance with the JMS 1.0.2 specification.
046     */
047    public abstract class JmsConnector implements Service {
048    
049        private static int nextId;
050        private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class);
051    
052        protected boolean preferJndiDestinationLookup = false;
053        protected JndiLookupFactory jndiLocalTemplate;
054        protected JndiLookupFactory jndiOutboundTemplate;
055        protected JmsMesageConvertor inboundMessageConvertor;
056        protected JmsMesageConvertor outboundMessageConvertor;
057        protected AtomicBoolean initialized = new AtomicBoolean(false);
058        protected AtomicBoolean localSideInitialized = new AtomicBoolean(false);
059        protected AtomicBoolean foreignSideInitialized = new AtomicBoolean(false);
060        protected AtomicBoolean started = new AtomicBoolean(false);
061        protected AtomicBoolean failed = new AtomicBoolean();
062        protected AtomicReference<Connection> foreignConnection = new AtomicReference<Connection>();
063        protected AtomicReference<Connection> localConnection = new AtomicReference<Connection>();
064        protected ActiveMQConnectionFactory embeddedConnectionFactory;
065        protected int replyToDestinationCacheSize = 10000;
066        protected String outboundUsername;
067        protected String outboundPassword;
068        protected String localUsername;
069        protected String localPassword;
070        protected String outboundClientId;
071        protected String localClientId;
072        protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache();
073    
074        private ReconnectionPolicy policy = new ReconnectionPolicy();
075        protected ThreadPoolExecutor connectionSerivce;
076        private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
077        private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
078        private String name;
079    
080        private static LRUCache<Destination, DestinationBridge> createLRUCache() {
081            return new LRUCache<Destination, DestinationBridge>() {
082                private static final long serialVersionUID = -7446792754185879286L;
083    
084                protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) {
085                    if (size() > maxCacheSize) {
086                        Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator();
087                        Map.Entry<Destination, DestinationBridge> lru = iter.next();
088                        remove(lru.getKey());
089                        DestinationBridge bridge = (DestinationBridge)lru.getValue();
090                        try {
091                            bridge.stop();
092                            LOG.info("Expired bridge: " + bridge);
093                        } catch (Exception e) {
094                            LOG.warn("stopping expired bridge" + bridge + " caused an exception", e);
095                        }
096                    }
097                    return false;
098                }
099            };
100        }
101    
102        public boolean init() {
103            boolean result = initialized.compareAndSet(false, true);
104            if (result) {
105                if (jndiLocalTemplate == null) {
106                    jndiLocalTemplate = new JndiLookupFactory();
107                }
108                if (jndiOutboundTemplate == null) {
109                    jndiOutboundTemplate = new JndiLookupFactory();
110                }
111                if (inboundMessageConvertor == null) {
112                    inboundMessageConvertor = new SimpleJmsMessageConvertor();
113                }
114                if (outboundMessageConvertor == null) {
115                    outboundMessageConvertor = new SimpleJmsMessageConvertor();
116                }
117                replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
118    
119                connectionSerivce = createExecutor();
120    
121                // Subclasses can override this to customize their own it.
122                result = doConnectorInit();
123            }
124            return result;
125        }
126    
127        protected boolean doConnectorInit() {
128    
129            // We try to make a connection via a sync call first so that the
130            // JmsConnector is fully initialized before the start call returns
131            // in order to avoid missing any messages that are dispatched
132            // immediately after startup.  If either side fails we queue an
133            // asynchronous task to manage the reconnect attempts.
134    
135            try {
136                initializeLocalConnection();
137                localSideInitialized.set(true);
138            } catch(Exception e) {
139                // Queue up the task to attempt the local connection.
140                scheduleAsyncLocalConnectionReconnect();
141            }
142    
143            try {
144                initializeForeignConnection();
145                foreignSideInitialized.set(true);
146            } catch(Exception e) {
147                // Queue up the task for the foreign connection now.
148                scheduleAsyncForeignConnectionReconnect();
149            }
150    
151            return true;
152        }
153    
154        public void start() throws Exception {
155            if (started.compareAndSet(false, true)) {
156                init();
157                for (DestinationBridge bridge : inboundBridges) {
158                    bridge.start();
159                }
160                for (DestinationBridge bridge : outboundBridges) {
161                    bridge.start();
162                }
163                LOG.info("JMS Connector " + getName() + " Started");
164            }
165        }
166    
167        public void stop() throws Exception {
168            if (started.compareAndSet(true, false)) {
169    
170                ThreadPoolUtils.shutdown(connectionSerivce);
171                connectionSerivce = null;
172    
173                for (DestinationBridge bridge : inboundBridges) {
174                    bridge.stop();
175                }
176                for (DestinationBridge bridge : outboundBridges) {
177                    bridge.stop();
178                }
179                LOG.info("JMS Connector " + getName() + " Stopped");
180            }
181        }
182    
183        public void clearBridges() {
184            inboundBridges.clear();
185            outboundBridges.clear();
186            replyToBridges.clear();
187        }
188    
189        protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
190    
191        /**
192         * One way to configure the local connection - this is called by The
193         * BrokerService when the Connector is embedded
194         *
195         * @param service
196         */
197        public void setBrokerService(BrokerService service) {
198            embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
199        }
200    
201        public Connection getLocalConnection() {
202            return this.localConnection.get();
203        }
204    
205        public Connection getForeignConnection() {
206            return this.foreignConnection.get();
207        }
208    
209        /**
210         * @return Returns the jndiTemplate.
211         */
212        public JndiLookupFactory getJndiLocalTemplate() {
213            return jndiLocalTemplate;
214        }
215    
216        /**
217         * @param jndiTemplate The jndiTemplate to set.
218         */
219        public void setJndiLocalTemplate(JndiLookupFactory jndiTemplate) {
220            this.jndiLocalTemplate = jndiTemplate;
221        }
222    
223        /**
224         * @return Returns the jndiOutboundTemplate.
225         */
226        public JndiLookupFactory getJndiOutboundTemplate() {
227            return jndiOutboundTemplate;
228        }
229    
230        /**
231         * @param jndiOutboundTemplate The jndiOutboundTemplate to set.
232         */
233        public void setJndiOutboundTemplate(JndiLookupFactory jndiOutboundTemplate) {
234            this.jndiOutboundTemplate = jndiOutboundTemplate;
235        }
236    
237        /**
238         * @return Returns the inboundMessageConvertor.
239         */
240        public JmsMesageConvertor getInboundMessageConvertor() {
241            return inboundMessageConvertor;
242        }
243    
244        /**
245         * @param inboundMessageConvertor The inboundMessageConvertor to set.
246         */
247        public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
248            this.inboundMessageConvertor = jmsMessageConvertor;
249        }
250    
251        /**
252         * @return Returns the outboundMessageConvertor.
253         */
254        public JmsMesageConvertor getOutboundMessageConvertor() {
255            return outboundMessageConvertor;
256        }
257    
258        /**
259         * @param outboundMessageConvertor The outboundMessageConvertor to set.
260         */
261        public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) {
262            this.outboundMessageConvertor = outboundMessageConvertor;
263        }
264    
265        /**
266         * @return Returns the replyToDestinationCacheSize.
267         */
268        public int getReplyToDestinationCacheSize() {
269            return replyToDestinationCacheSize;
270        }
271    
272        /**
273         * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set.
274         */
275        public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
276            this.replyToDestinationCacheSize = replyToDestinationCacheSize;
277        }
278    
279        /**
280         * @return Returns the localPassword.
281         */
282        public String getLocalPassword() {
283            return localPassword;
284        }
285    
286        /**
287         * @param localPassword The localPassword to set.
288         */
289        public void setLocalPassword(String localPassword) {
290            this.localPassword = localPassword;
291        }
292    
293        /**
294         * @return Returns the localUsername.
295         */
296        public String getLocalUsername() {
297            return localUsername;
298        }
299    
300        /**
301         * @param localUsername The localUsername to set.
302         */
303        public void setLocalUsername(String localUsername) {
304            this.localUsername = localUsername;
305        }
306    
307        /**
308         * @return Returns the outboundPassword.
309         */
310        public String getOutboundPassword() {
311            return outboundPassword;
312        }
313    
314        /**
315         * @param outboundPassword The outboundPassword to set.
316         */
317        public void setOutboundPassword(String outboundPassword) {
318            this.outboundPassword = outboundPassword;
319        }
320    
321        /**
322         * @return Returns the outboundUsername.
323         */
324        public String getOutboundUsername() {
325            return outboundUsername;
326        }
327    
328        /**
329         * @param outboundUsername The outboundUsername to set.
330         */
331        public void setOutboundUsername(String outboundUsername) {
332            this.outboundUsername = outboundUsername;
333        }
334    
335        /**
336         * @return the outboundClientId
337         */
338        public String getOutboundClientId() {
339            return outboundClientId;
340        }
341    
342        /**
343         * @param outboundClientId the outboundClientId to set
344         */
345        public void setOutboundClientId(String outboundClientId) {
346            this.outboundClientId = outboundClientId;
347        }
348    
349        /**
350         * @return the localClientId
351         */
352        public String getLocalClientId() {
353            return localClientId;
354        }
355    
356        /**
357         * @param localClientId the localClientId to set
358         */
359        public void setLocalClientId(String localClientId) {
360            this.localClientId = localClientId;
361        }
362    
363        /**
364         * @return the currently configured reconnection policy.
365         */
366        public ReconnectionPolicy getReconnectionPolicy() {
367            return this.policy;
368        }
369    
370        /**
371         * @param policy The new reconnection policy this {@link JmsConnector} should use.
372         */
373        public void setReconnectionPolicy(ReconnectionPolicy policy) {
374            this.policy = policy;
375        }
376    
377        /**
378         * @return the preferJndiDestinationLookup
379         */
380        public boolean isPreferJndiDestinationLookup() {
381            return preferJndiDestinationLookup;
382        }
383    
384        /**
385         * Sets whether the connector should prefer to first try to find a destination in JNDI before
386         * using JMS semantics to create a Destination.  By default the connector will first use JMS
387         * semantics and then fall-back to JNDI lookup, setting this value to true will reverse that
388         * ordering.
389         *
390         * @param preferJndiDestinationLookup the preferJndiDestinationLookup to set
391         */
392        public void setPreferJndiDestinationLookup(boolean preferJndiDestinationLookup) {
393            this.preferJndiDestinationLookup = preferJndiDestinationLookup;
394        }
395    
396        /**
397         * @return returns true if the {@link JmsConnector} is connected to both brokers.
398         */
399        public boolean isConnected() {
400            return localConnection.get() != null && foreignConnection.get() != null;
401        }
402    
403        protected void addInboundBridge(DestinationBridge bridge) {
404            if (!inboundBridges.contains(bridge)) {
405                inboundBridges.add(bridge);
406            }
407        }
408    
409        protected void addOutboundBridge(DestinationBridge bridge) {
410            if (!outboundBridges.contains(bridge)) {
411                outboundBridges.add(bridge);
412            }
413        }
414    
415        protected void removeInboundBridge(DestinationBridge bridge) {
416            inboundBridges.remove(bridge);
417        }
418    
419        protected void removeOutboundBridge(DestinationBridge bridge) {
420            outboundBridges.remove(bridge);
421        }
422    
423        public String getName() {
424            if (name == null) {
425                name = "Connector:" + getNextId();
426            }
427            return name;
428        }
429    
430        public void setName(String name) {
431            this.name = name;
432        }
433    
434        private static synchronized int getNextId() {
435            return nextId++;
436        }
437    
438        public boolean isFailed() {
439            return this.failed.get();
440        }
441    
442        /**
443         * Performs the work of connection to the local side of the Connection.
444         * <p>
445         * This creates the initial connection to the local end of the {@link JmsConnector}
446         * and then sets up all the destination bridges with the information needed to bridge
447         * on the local side of the connection.
448         *
449         * @throws Exception if the connection cannot be established for any reason.
450         */
451        protected abstract void initializeLocalConnection() throws Exception;
452    
453        /**
454         * Performs the work of connection to the foreign side of the Connection.
455         * <p>
456         * This creates the initial connection to the foreign end of the {@link JmsConnector}
457         * and then sets up all the destination bridges with the information needed to bridge
458         * on the foreign side of the connection.
459         *
460         * @throws Exception if the connection cannot be established for any reason.
461         */
462        protected abstract void initializeForeignConnection() throws Exception;
463    
464        /**
465         * Callback method that the Destination bridges can use to report an exception to occurs
466         * during normal bridging operations.
467         *
468         * @param connection
469         *          The connection that was in use when the failure occured.
470         */
471        void handleConnectionFailure(Connection connection) {
472    
473            // Can happen if async exception listener kicks in at the same time.
474            if (connection == null || !this.started.get()) {
475                return;
476            }
477    
478            LOG.info("JmsConnector handling loss of connection [" + connection.toString() + "]");
479    
480            // TODO - How do we handle the re-wiring of replyToBridges in this case.
481            replyToBridges.clear();
482    
483            if (this.foreignConnection.compareAndSet((QueueConnection)connection, null)) {
484    
485                // Stop the inbound bridges when the foreign connection is dropped since
486                // the bridge has no consumer and needs to be restarted once a new connection
487                // to the foreign side is made.
488                for (DestinationBridge bridge : inboundBridges) {
489                    try {
490                        bridge.stop();
491                    } catch(Exception e) {
492                    }
493                }
494    
495                // We got here first and cleared the connection, now we queue a reconnect.
496                this.connectionSerivce.execute(new Runnable() {
497    
498                    @Override
499                    public void run() {
500                        try {
501                            doInitializeConnection(false);
502                        } catch (Exception e) {
503                            LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
504                        }
505                    }
506                });
507    
508            } else if (this.localConnection.compareAndSet((QueueConnection)connection, null)) {
509    
510                // Stop the outbound bridges when the local connection is dropped since
511                // the bridge has no consumer and needs to be restarted once a new connection
512                // to the local side is made.
513                for (DestinationBridge bridge : outboundBridges) {
514                    try {
515                        bridge.stop();
516                    } catch(Exception e) {
517                    }
518                }
519    
520                // We got here first and cleared the connection, now we queue a reconnect.
521                this.connectionSerivce.execute(new Runnable() {
522    
523                    @Override
524                    public void run() {
525                        try {
526                            doInitializeConnection(true);
527                        } catch (Exception e) {
528                            LOG.error("Failed to initialize local connection for the JMSConnector", e);
529                        }
530                    }
531                });
532            }
533        }
534    
535        private void scheduleAsyncLocalConnectionReconnect() {
536            this.connectionSerivce.execute(new Runnable() {
537                @Override
538                public void run() {
539                    try {
540                        doInitializeConnection(true);
541                    } catch (Exception e) {
542                        LOG.error("Failed to initialize local connection for the JMSConnector", e);
543                    }
544                }
545            });
546        }
547    
548        private void scheduleAsyncForeignConnectionReconnect() {
549            this.connectionSerivce.execute(new Runnable() {
550                @Override
551                public void run() {
552                    try {
553                        doInitializeConnection(false);
554                    } catch (Exception e) {
555                        LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
556                    }
557                }
558            });
559        }
560    
561        private void doInitializeConnection(boolean local) throws Exception {
562    
563            int attempt = 0;
564    
565            final int maxRetries;
566            if (local) {
567                maxRetries = !localSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
568                                                           policy.getMaxReconnectAttempts();
569            } else {
570                maxRetries = !foreignSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
571                                                             policy.getMaxReconnectAttempts();
572            }
573    
574            do
575            {
576                if (attempt > 0) {
577                    try {
578                        Thread.sleep(policy.getNextDelay(attempt));
579                    } catch(InterruptedException e) {
580                    }
581                }
582    
583                if (connectionSerivce.isTerminating()) {
584                    return;
585                }
586    
587                try {
588    
589                    if (local) {
590                        initializeLocalConnection();
591                        localSideInitialized.set(true);
592                    } else {
593                        initializeForeignConnection();
594                        foreignSideInitialized.set(true);
595                    }
596    
597                    // Once we are connected we ensure all the bridges are started.
598                    if (localConnection.get() != null && foreignConnection.get() != null) {
599                        for (DestinationBridge bridge : inboundBridges) {
600                            bridge.start();
601                        }
602                        for (DestinationBridge bridge : outboundBridges) {
603                            bridge.start();
604                        }
605                    }
606    
607                    return;
608                } catch(Exception e) {
609                    LOG.debug("Failed to establish initial " + (local ? "local" : "foriegn") +
610                              " connection for JmsConnector [" + attempt + "]: " + e.getMessage());
611                }
612            }
613            while (maxRetries < ++attempt && !connectionSerivce.isTerminating());
614    
615            this.failed.set(true);
616        }
617    
618        private ThreadFactory factory = new ThreadFactory() {
619            public Thread newThread(Runnable runnable) {
620                Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: ");
621                thread.setDaemon(true);
622                return thread;
623            }
624        };
625    
626        private ThreadPoolExecutor createExecutor() {
627            ThreadPoolExecutor exec = new ThreadPoolExecutor(0, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
628            exec.allowCoreThreadTimeOut(true);
629            return exec;
630        }
631    }