001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network.jms;
018
019import static org.apache.activemq.network.jms.ReconnectionPolicy.INFINITE;
020
021import java.util.Iterator;
022import java.util.List;
023import java.util.Map;
024import java.util.concurrent.CopyOnWriteArrayList;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.ThreadFactory;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.TimeUnit;
029import java.util.concurrent.atomic.AtomicBoolean;
030import java.util.concurrent.atomic.AtomicReference;
031
032import javax.jms.Connection;
033import javax.jms.Destination;
034
035import org.apache.activemq.ActiveMQConnectionFactory;
036import org.apache.activemq.Service;
037import org.apache.activemq.broker.BrokerService;
038import org.apache.activemq.util.LRUCache;
039import org.apache.activemq.util.ThreadPoolUtils;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
045 * JMS providers are still only in compliance with JMS v1.0.1 , this bridge itself
046 * aimed to be in compliance with the JMS 1.0.2 specification.
047 */
048public abstract class JmsConnector implements Service {
049
050    private static int nextId;
051    private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class);
052
053    protected boolean preferJndiDestinationLookup = false;
054    protected JndiLookupFactory jndiLocalTemplate;
055    protected JndiLookupFactory jndiOutboundTemplate;
056    protected JmsMesageConvertor inboundMessageConvertor;
057    protected JmsMesageConvertor outboundMessageConvertor;
058    protected AtomicBoolean initialized = new AtomicBoolean(false);
059    protected AtomicBoolean localSideInitialized = new AtomicBoolean(false);
060    protected AtomicBoolean foreignSideInitialized = new AtomicBoolean(false);
061    protected AtomicBoolean started = new AtomicBoolean(false);
062    protected AtomicBoolean failed = new AtomicBoolean();
063    protected AtomicReference<Connection> foreignConnection = new AtomicReference<Connection>();
064    protected AtomicReference<Connection> localConnection = new AtomicReference<Connection>();
065    protected ActiveMQConnectionFactory embeddedConnectionFactory;
066    protected int replyToDestinationCacheSize = 10000;
067    protected String outboundUsername;
068    protected String outboundPassword;
069    protected String localUsername;
070    protected String localPassword;
071    protected String outboundClientId;
072    protected String localClientId;
073    protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache();
074
075    private ReconnectionPolicy policy = new ReconnectionPolicy();
076    protected ThreadPoolExecutor connectionService;
077    private final List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
078    private final List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
079    private String name;
080
081    private static LRUCache<Destination, DestinationBridge> createLRUCache() {
082        return new LRUCache<Destination, DestinationBridge>() {
083            private static final long serialVersionUID = -7446792754185879286L;
084
085            @Override
086            protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) {
087                if (size() > maxCacheSize) {
088                    Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator();
089                    Map.Entry<Destination, DestinationBridge> lru = iter.next();
090                    remove(lru.getKey());
091                    DestinationBridge bridge = lru.getValue();
092                    try {
093                        bridge.stop();
094                        LOG.info("Expired bridge: {}", bridge);
095                    } catch (Exception e) {
096                        LOG.warn("Stopping expired bridge {} caused an exception", bridge, e);
097                    }
098                }
099                return false;
100            }
101        };
102    }
103
104    public boolean init() {
105        boolean result = initialized.compareAndSet(false, true);
106        if (result) {
107            if (jndiLocalTemplate == null) {
108                jndiLocalTemplate = new JndiLookupFactory();
109            }
110            if (jndiOutboundTemplate == null) {
111                jndiOutboundTemplate = new JndiLookupFactory();
112            }
113            if (inboundMessageConvertor == null) {
114                inboundMessageConvertor = new SimpleJmsMessageConvertor();
115            }
116            if (outboundMessageConvertor == null) {
117                outboundMessageConvertor = new SimpleJmsMessageConvertor();
118            }
119            replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
120
121            connectionService = createExecutor();
122
123            // Subclasses can override this to customize their own it.
124            result = doConnectorInit();
125        }
126        return result;
127    }
128
129    protected boolean doConnectorInit() {
130
131        // We try to make a connection via a sync call first so that the
132        // JmsConnector is fully initialized before the start call returns
133        // in order to avoid missing any messages that are dispatched
134        // immediately after startup.  If either side fails we queue an
135        // asynchronous task to manage the reconnect attempts.
136
137        try {
138            initializeLocalConnection();
139            localSideInitialized.set(true);
140        } catch(Exception e) {
141            // Queue up the task to attempt the local connection.
142            scheduleAsyncLocalConnectionReconnect();
143        }
144
145        try {
146            initializeForeignConnection();
147            foreignSideInitialized.set(true);
148        } catch(Exception e) {
149            // Queue up the task for the foreign connection now.
150            scheduleAsyncForeignConnectionReconnect();
151        }
152
153        return true;
154    }
155
156    @Override
157    public void start() throws Exception {
158        if (started.compareAndSet(false, true)) {
159            init();
160            for (DestinationBridge bridge : inboundBridges) {
161                bridge.start();
162            }
163            for (DestinationBridge bridge : outboundBridges) {
164                bridge.start();
165            }
166            LOG.info("JMS Connector {} started", getName());
167        }
168    }
169
170    @Override
171    public void stop() throws Exception {
172        if (started.compareAndSet(true, false)) {
173
174            ThreadPoolUtils.shutdown(connectionService);
175            connectionService = null;
176
177            if (foreignConnection.get() != null) {
178                try {
179                    foreignConnection.get().close();
180                } catch (Exception e) {
181                }
182            }
183
184            if (localConnection.get() != null) {
185                try {
186                    localConnection.get().close();
187                } catch (Exception e) {
188                }
189            }
190
191            for (DestinationBridge bridge : inboundBridges) {
192                bridge.stop();
193            }
194            for (DestinationBridge bridge : outboundBridges) {
195                bridge.stop();
196            }
197            LOG.info("JMS Connector {} stopped", getName());
198        }
199    }
200
201    public void clearBridges() {
202        inboundBridges.clear();
203        outboundBridges.clear();
204        replyToBridges.clear();
205    }
206
207    protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
208
209    /**
210     * One way to configure the local connection - this is called by The
211     * BrokerService when the Connector is embedded
212     *
213     * @param service
214     */
215    public void setBrokerService(BrokerService service) {
216        embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
217    }
218
219    public Connection getLocalConnection() {
220        return this.localConnection.get();
221    }
222
223    public Connection getForeignConnection() {
224        return this.foreignConnection.get();
225    }
226
227    /**
228     * @return Returns the jndiTemplate.
229     */
230    public JndiLookupFactory getJndiLocalTemplate() {
231        return jndiLocalTemplate;
232    }
233
234    /**
235     * @param jndiTemplate The jndiTemplate to set.
236     */
237    public void setJndiLocalTemplate(JndiLookupFactory jndiTemplate) {
238        this.jndiLocalTemplate = jndiTemplate;
239    }
240
241    /**
242     * @return Returns the jndiOutboundTemplate.
243     */
244    public JndiLookupFactory getJndiOutboundTemplate() {
245        return jndiOutboundTemplate;
246    }
247
248    /**
249     * @param jndiOutboundTemplate The jndiOutboundTemplate to set.
250     */
251    public void setJndiOutboundTemplate(JndiLookupFactory jndiOutboundTemplate) {
252        this.jndiOutboundTemplate = jndiOutboundTemplate;
253    }
254
255    /**
256     * @return Returns the inboundMessageConvertor.
257     */
258    public JmsMesageConvertor getInboundMessageConvertor() {
259        return inboundMessageConvertor;
260    }
261
262    /**
263     * @param jmsMessageConvertor The jmsMessageConvertor to set.
264     */
265    public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
266        this.inboundMessageConvertor = jmsMessageConvertor;
267    }
268
269    /**
270     * @return Returns the outboundMessageConvertor.
271     */
272    public JmsMesageConvertor getOutboundMessageConvertor() {
273        return outboundMessageConvertor;
274    }
275
276    /**
277     * @param outboundMessageConvertor The outboundMessageConvertor to set.
278     */
279    public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) {
280        this.outboundMessageConvertor = outboundMessageConvertor;
281    }
282
283    /**
284     * @return Returns the replyToDestinationCacheSize.
285     */
286    public int getReplyToDestinationCacheSize() {
287        return replyToDestinationCacheSize;
288    }
289
290    /**
291     * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set.
292     */
293    public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
294        this.replyToDestinationCacheSize = replyToDestinationCacheSize;
295    }
296
297    /**
298     * @return Returns the localPassword.
299     */
300    public String getLocalPassword() {
301        return localPassword;
302    }
303
304    /**
305     * @param localPassword The localPassword to set.
306     */
307    public void setLocalPassword(String localPassword) {
308        this.localPassword = localPassword;
309    }
310
311    /**
312     * @return Returns the localUsername.
313     */
314    public String getLocalUsername() {
315        return localUsername;
316    }
317
318    /**
319     * @param localUsername The localUsername to set.
320     */
321    public void setLocalUsername(String localUsername) {
322        this.localUsername = localUsername;
323    }
324
325    /**
326     * @return Returns the outboundPassword.
327     */
328    public String getOutboundPassword() {
329        return outboundPassword;
330    }
331
332    /**
333     * @param outboundPassword The outboundPassword to set.
334     */
335    public void setOutboundPassword(String outboundPassword) {
336        this.outboundPassword = outboundPassword;
337    }
338
339    /**
340     * @return Returns the outboundUsername.
341     */
342    public String getOutboundUsername() {
343        return outboundUsername;
344    }
345
346    /**
347     * @param outboundUsername The outboundUsername to set.
348     */
349    public void setOutboundUsername(String outboundUsername) {
350        this.outboundUsername = outboundUsername;
351    }
352
353    /**
354     * @return the outboundClientId
355     */
356    public String getOutboundClientId() {
357        return outboundClientId;
358    }
359
360    /**
361     * @param outboundClientId the outboundClientId to set
362     */
363    public void setOutboundClientId(String outboundClientId) {
364        this.outboundClientId = outboundClientId;
365    }
366
367    /**
368     * @return the localClientId
369     */
370    public String getLocalClientId() {
371        return localClientId;
372    }
373
374    /**
375     * @param localClientId the localClientId to set
376     */
377    public void setLocalClientId(String localClientId) {
378        this.localClientId = localClientId;
379    }
380
381    /**
382     * @return the currently configured reconnection policy.
383     */
384    public ReconnectionPolicy getReconnectionPolicy() {
385        return this.policy;
386    }
387
388    /**
389     * @param policy The new reconnection policy this {@link JmsConnector} should use.
390     */
391    public void setReconnectionPolicy(ReconnectionPolicy policy) {
392        this.policy = policy;
393    }
394
395    /**
396     * @return the preferJndiDestinationLookup
397     */
398    public boolean isPreferJndiDestinationLookup() {
399        return preferJndiDestinationLookup;
400    }
401
402    /**
403     * Sets whether the connector should prefer to first try to find a destination in JNDI before
404     * using JMS semantics to create a Destination.  By default the connector will first use JMS
405     * semantics and then fall-back to JNDI lookup, setting this value to true will reverse that
406     * ordering.
407     *
408     * @param preferJndiDestinationLookup the preferJndiDestinationLookup to set
409     */
410    public void setPreferJndiDestinationLookup(boolean preferJndiDestinationLookup) {
411        this.preferJndiDestinationLookup = preferJndiDestinationLookup;
412    }
413
414    /**
415     * @return returns true if the {@link JmsConnector} is connected to both brokers.
416     */
417    public boolean isConnected() {
418        return localConnection.get() != null && foreignConnection.get() != null;
419    }
420
421    protected void addInboundBridge(DestinationBridge bridge) {
422        if (!inboundBridges.contains(bridge)) {
423            inboundBridges.add(bridge);
424        }
425    }
426
427    protected void addOutboundBridge(DestinationBridge bridge) {
428        if (!outboundBridges.contains(bridge)) {
429            outboundBridges.add(bridge);
430        }
431    }
432
433    protected void removeInboundBridge(DestinationBridge bridge) {
434        inboundBridges.remove(bridge);
435    }
436
437    protected void removeOutboundBridge(DestinationBridge bridge) {
438        outboundBridges.remove(bridge);
439    }
440
441    public String getName() {
442        if (name == null) {
443            name = "Connector:" + getNextId();
444        }
445        return name;
446    }
447
448    public void setName(String name) {
449        this.name = name;
450    }
451
452    private static synchronized int getNextId() {
453        return nextId++;
454    }
455
456    public boolean isFailed() {
457        return this.failed.get();
458    }
459
460    /**
461     * Performs the work of connection to the local side of the Connection.
462     * <p>
463     * This creates the initial connection to the local end of the {@link JmsConnector}
464     * and then sets up all the destination bridges with the information needed to bridge
465     * on the local side of the connection.
466     *
467     * @throws Exception if the connection cannot be established for any reason.
468     */
469    protected abstract void initializeLocalConnection() throws Exception;
470
471    /**
472     * Performs the work of connection to the foreign side of the Connection.
473     * <p>
474     * This creates the initial connection to the foreign end of the {@link JmsConnector}
475     * and then sets up all the destination bridges with the information needed to bridge
476     * on the foreign side of the connection.
477     *
478     * @throws Exception if the connection cannot be established for any reason.
479     */
480    protected abstract void initializeForeignConnection() throws Exception;
481
482    /**
483     * Callback method that the Destination bridges can use to report an exception to occurs
484     * during normal bridging operations.
485     *
486     * @param connection
487     *          The connection that was in use when the failure occured.
488     */
489    void handleConnectionFailure(Connection connection) {
490
491        // Can happen if async exception listener kicks in at the same time.
492        if (connection == null || !this.started.get()) {
493            return;
494        }
495
496        LOG.info("JmsConnector handling loss of connection [{}]", connection.toString());
497
498        // TODO - How do we handle the re-wiring of replyToBridges in this case.
499        replyToBridges.clear();
500
501        if (this.foreignConnection.compareAndSet(connection, null)) {
502
503            // Stop the inbound bridges when the foreign connection is dropped since
504            // the bridge has no consumer and needs to be restarted once a new connection
505            // to the foreign side is made.
506            for (DestinationBridge bridge : inboundBridges) {
507                try {
508                    bridge.stop();
509                } catch(Exception e) {
510                }
511            }
512
513            // We got here first and cleared the connection, now we queue a reconnect.
514            this.connectionService.execute(new Runnable() {
515
516                @Override
517                public void run() {
518                    try {
519                        doInitializeConnection(false);
520                    } catch (Exception e) {
521                        LOG.error("Failed to initialize foreign connection for the JMSConnector", e);
522                    }
523                }
524            });
525
526        } else if (this.localConnection.compareAndSet(connection, null)) {
527
528            // Stop the outbound bridges when the local connection is dropped since
529            // the bridge has no consumer and needs to be restarted once a new connection
530            // to the local side is made.
531            for (DestinationBridge bridge : outboundBridges) {
532                try {
533                    bridge.stop();
534                } catch(Exception e) {
535                }
536            }
537
538            // We got here first and cleared the connection, now we queue a reconnect.
539            this.connectionService.execute(new Runnable() {
540
541                @Override
542                public void run() {
543                    try {
544                        doInitializeConnection(true);
545                    } catch (Exception e) {
546                        LOG.error("Failed to initialize local connection for the JMSConnector", e);
547                    }
548                }
549            });
550        }
551    }
552
553    private void scheduleAsyncLocalConnectionReconnect() {
554        this.connectionService.execute(new Runnable() {
555            @Override
556            public void run() {
557                try {
558                    doInitializeConnection(true);
559                } catch (Exception e) {
560                    LOG.error("Failed to initialize local connection for the JMSConnector", e);
561                }
562            }
563        });
564    }
565
566    private void scheduleAsyncForeignConnectionReconnect() {
567        this.connectionService.execute(new Runnable() {
568            @Override
569            public void run() {
570                try {
571                    doInitializeConnection(false);
572                } catch (Exception e) {
573                    LOG.error("Failed to initialize foreign connection for the JMSConnector", e);
574                }
575            }
576        });
577    }
578
579    private void doInitializeConnection(boolean local) throws Exception {
580
581        ThreadPoolExecutor connectionService = this.connectionService;
582        int attempt = 0;
583
584        final int maxRetries;
585        if (local) {
586            maxRetries = !localSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
587                                                       policy.getMaxReconnectAttempts();
588        } else {
589            maxRetries = !foreignSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
590                                                         policy.getMaxReconnectAttempts();
591        }
592
593        do {
594            if (attempt > 0) {
595                try {
596                    Thread.sleep(policy.getNextDelay(attempt));
597                } catch(InterruptedException e) {
598                }
599            }
600
601            if (connectionService.isTerminating()) {
602                return;
603            }
604
605            try {
606
607                if (local) {
608                    initializeLocalConnection();
609                    localSideInitialized.set(true);
610                } else {
611                    initializeForeignConnection();
612                    foreignSideInitialized.set(true);
613                }
614
615                // Once we are connected we ensure all the bridges are started.
616                if (localConnection.get() != null && foreignConnection.get() != null) {
617                    for (DestinationBridge bridge : inboundBridges) {
618                        bridge.start();
619                    }
620                    for (DestinationBridge bridge : outboundBridges) {
621                        bridge.start();
622                    }
623                }
624
625                return;
626            } catch(Exception e) {
627                LOG.debug("Failed to establish initial {} connection for JmsConnector [{}]", new Object[]{ (local ? "local" : "foreign"), attempt }, e);
628            }
629        }
630        while ((maxRetries == INFINITE || maxRetries > ++attempt) && !connectionService.isShutdown());
631
632        this.failed.set(true);
633    }
634
635    private final ThreadFactory factory = new ThreadFactory() {
636        @Override
637        public Thread newThread(Runnable runnable) {
638            Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: ");
639            thread.setDaemon(true);
640            return thread;
641        }
642    };
643
644    private ThreadPoolExecutor createExecutor() {
645        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
646        exec.allowCoreThreadTimeOut(true);
647        return exec;
648    }
649}