001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network.jms;
018
019 import java.util.Iterator;
020 import java.util.List;
021 import java.util.Map;
022 import java.util.concurrent.CopyOnWriteArrayList;
023 import java.util.concurrent.LinkedBlockingQueue;
024 import java.util.concurrent.ThreadFactory;
025 import java.util.concurrent.ThreadPoolExecutor;
026 import java.util.concurrent.TimeUnit;
027 import java.util.concurrent.atomic.AtomicBoolean;
028 import java.util.concurrent.atomic.AtomicReference;
029
030 import javax.jms.Connection;
031 import javax.jms.Destination;
032 import javax.jms.QueueConnection;
033
034 import org.apache.activemq.ActiveMQConnectionFactory;
035 import org.apache.activemq.Service;
036 import org.apache.activemq.broker.BrokerService;
037 import org.apache.activemq.util.LRUCache;
038 import org.apache.activemq.util.ThreadPoolUtils;
039 import org.slf4j.Logger;
040 import org.slf4j.LoggerFactory;
041 import org.springframework.jndi.JndiTemplate;
042
043 /**
044 * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
045 * JMS providers are still only in compliance with JMS v1.0.1 , this bridge itself
046 * aimed to be in compliance with the JMS 1.0.2 specification.
047 */
048 public abstract class JmsConnector implements Service {
049
050 private static int nextId;
051 private static final Logger LOG = LoggerFactory.getLogger(JmsConnector.class);
052
053 protected boolean preferJndiDestinationLookup = false;
054 protected JndiTemplate jndiLocalTemplate;
055 protected JndiTemplate jndiOutboundTemplate;
056 protected JmsMesageConvertor inboundMessageConvertor;
057 protected JmsMesageConvertor outboundMessageConvertor;
058 protected AtomicBoolean initialized = new AtomicBoolean(false);
059 protected AtomicBoolean localSideInitialized = new AtomicBoolean(false);
060 protected AtomicBoolean foreignSideInitialized = new AtomicBoolean(false);
061 protected AtomicBoolean started = new AtomicBoolean(false);
062 protected AtomicBoolean failed = new AtomicBoolean();
063 protected AtomicReference<Connection> foreignConnection = new AtomicReference<Connection>();
064 protected AtomicReference<Connection> localConnection = new AtomicReference<Connection>();
065 protected ActiveMQConnectionFactory embeddedConnectionFactory;
066 protected int replyToDestinationCacheSize = 10000;
067 protected String outboundUsername;
068 protected String outboundPassword;
069 protected String localUsername;
070 protected String localPassword;
071 protected String outboundClientId;
072 protected String localClientId;
073 protected LRUCache<Destination, DestinationBridge> replyToBridges = createLRUCache();
074
075 private ReconnectionPolicy policy = new ReconnectionPolicy();
076 protected ThreadPoolExecutor connectionSerivce;
077 private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
078 private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
079 private String name;
080
081 private static LRUCache<Destination, DestinationBridge> createLRUCache() {
082 return new LRUCache<Destination, DestinationBridge>() {
083 private static final long serialVersionUID = -7446792754185879286L;
084
085 protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) {
086 if (size() > maxCacheSize) {
087 Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator();
088 Map.Entry<Destination, DestinationBridge> lru = iter.next();
089 remove(lru.getKey());
090 DestinationBridge bridge = (DestinationBridge)lru.getValue();
091 try {
092 bridge.stop();
093 LOG.info("Expired bridge: " + bridge);
094 } catch (Exception e) {
095 LOG.warn("stopping expired bridge" + bridge + " caused an exception", e);
096 }
097 }
098 return false;
099 }
100 };
101 }
102
103 public boolean init() {
104 boolean result = initialized.compareAndSet(false, true);
105 if (result) {
106 if (jndiLocalTemplate == null) {
107 jndiLocalTemplate = new JndiTemplate();
108 }
109 if (jndiOutboundTemplate == null) {
110 jndiOutboundTemplate = new JndiTemplate();
111 }
112 if (inboundMessageConvertor == null) {
113 inboundMessageConvertor = new SimpleJmsMessageConvertor();
114 }
115 if (outboundMessageConvertor == null) {
116 outboundMessageConvertor = new SimpleJmsMessageConvertor();
117 }
118 replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
119
120 connectionSerivce = createExecutor();
121
122 // Subclasses can override this to customize their own it.
123 result = doConnectorInit();
124 }
125 return result;
126 }
127
128 protected boolean doConnectorInit() {
129
130 // We try to make a connection via a sync call first so that the
131 // JmsConnector is fully initialized before the start call returns
132 // in order to avoid missing any messages that are dispatched
133 // immediately after startup. If either side fails we queue an
134 // asynchronous task to manage the reconnect attempts.
135
136 try {
137 initializeLocalConnection();
138 localSideInitialized.set(true);
139 } catch(Exception e) {
140 // Queue up the task to attempt the local connection.
141 scheduleAsyncLocalConnectionReconnect();
142 }
143
144 try {
145 initializeForeignConnection();
146 foreignSideInitialized.set(true);
147 } catch(Exception e) {
148 // Queue up the task for the foreign connection now.
149 scheduleAsyncForeignConnectionReconnect();
150 }
151
152 return true;
153 }
154
155 public void start() throws Exception {
156 if (started.compareAndSet(false, true)) {
157 init();
158 for (DestinationBridge bridge : inboundBridges) {
159 bridge.start();
160 }
161 for (DestinationBridge bridge : outboundBridges) {
162 bridge.start();
163 }
164 LOG.info("JMS Connector " + getName() + " Started");
165 }
166 }
167
168 public void stop() throws Exception {
169 if (started.compareAndSet(true, false)) {
170
171 ThreadPoolUtils.shutdown(connectionSerivce);
172 connectionSerivce = null;
173
174 for (DestinationBridge bridge : inboundBridges) {
175 bridge.stop();
176 }
177 for (DestinationBridge bridge : outboundBridges) {
178 bridge.stop();
179 }
180 LOG.info("JMS Connector " + getName() + " Stopped");
181 }
182 }
183
184 public void clearBridges() {
185 inboundBridges.clear();
186 outboundBridges.clear();
187 replyToBridges.clear();
188 }
189
190 protected abstract Destination createReplyToBridge(Destination destination, Connection consumerConnection, Connection producerConnection);
191
192 /**
193 * One way to configure the local connection - this is called by The
194 * BrokerService when the Connector is embedded
195 *
196 * @param service
197 */
198 public void setBrokerService(BrokerService service) {
199 embeddedConnectionFactory = new ActiveMQConnectionFactory(service.getVmConnectorURI());
200 }
201
202 public Connection getLocalConnection() {
203 return this.localConnection.get();
204 }
205
206 public Connection getForeignConnection() {
207 return this.foreignConnection.get();
208 }
209
210 /**
211 * @return Returns the jndiTemplate.
212 */
213 public JndiTemplate getJndiLocalTemplate() {
214 return jndiLocalTemplate;
215 }
216
217 /**
218 * @param jndiTemplate The jndiTemplate to set.
219 */
220 public void setJndiLocalTemplate(JndiTemplate jndiTemplate) {
221 this.jndiLocalTemplate = jndiTemplate;
222 }
223
224 /**
225 * @return Returns the jndiOutboundTemplate.
226 */
227 public JndiTemplate getJndiOutboundTemplate() {
228 return jndiOutboundTemplate;
229 }
230
231 /**
232 * @param jndiOutboundTemplate The jndiOutboundTemplate to set.
233 */
234 public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) {
235 this.jndiOutboundTemplate = jndiOutboundTemplate;
236 }
237
238 /**
239 * @return Returns the inboundMessageConvertor.
240 */
241 public JmsMesageConvertor getInboundMessageConvertor() {
242 return inboundMessageConvertor;
243 }
244
245 /**
246 * @param inboundMessageConvertor The inboundMessageConvertor to set.
247 */
248 public void setInboundMessageConvertor(JmsMesageConvertor jmsMessageConvertor) {
249 this.inboundMessageConvertor = jmsMessageConvertor;
250 }
251
252 /**
253 * @return Returns the outboundMessageConvertor.
254 */
255 public JmsMesageConvertor getOutboundMessageConvertor() {
256 return outboundMessageConvertor;
257 }
258
259 /**
260 * @param outboundMessageConvertor The outboundMessageConvertor to set.
261 */
262 public void setOutboundMessageConvertor(JmsMesageConvertor outboundMessageConvertor) {
263 this.outboundMessageConvertor = outboundMessageConvertor;
264 }
265
266 /**
267 * @return Returns the replyToDestinationCacheSize.
268 */
269 public int getReplyToDestinationCacheSize() {
270 return replyToDestinationCacheSize;
271 }
272
273 /**
274 * @param replyToDestinationCacheSize The replyToDestinationCacheSize to set.
275 */
276 public void setReplyToDestinationCacheSize(int replyToDestinationCacheSize) {
277 this.replyToDestinationCacheSize = replyToDestinationCacheSize;
278 }
279
280 /**
281 * @return Returns the localPassword.
282 */
283 public String getLocalPassword() {
284 return localPassword;
285 }
286
287 /**
288 * @param localPassword The localPassword to set.
289 */
290 public void setLocalPassword(String localPassword) {
291 this.localPassword = localPassword;
292 }
293
294 /**
295 * @return Returns the localUsername.
296 */
297 public String getLocalUsername() {
298 return localUsername;
299 }
300
301 /**
302 * @param localUsername The localUsername to set.
303 */
304 public void setLocalUsername(String localUsername) {
305 this.localUsername = localUsername;
306 }
307
308 /**
309 * @return Returns the outboundPassword.
310 */
311 public String getOutboundPassword() {
312 return outboundPassword;
313 }
314
315 /**
316 * @param outboundPassword The outboundPassword to set.
317 */
318 public void setOutboundPassword(String outboundPassword) {
319 this.outboundPassword = outboundPassword;
320 }
321
322 /**
323 * @return Returns the outboundUsername.
324 */
325 public String getOutboundUsername() {
326 return outboundUsername;
327 }
328
329 /**
330 * @param outboundUsername The outboundUsername to set.
331 */
332 public void setOutboundUsername(String outboundUsername) {
333 this.outboundUsername = outboundUsername;
334 }
335
336 /**
337 * @return the outboundClientId
338 */
339 public String getOutboundClientId() {
340 return outboundClientId;
341 }
342
343 /**
344 * @param outboundClientId the outboundClientId to set
345 */
346 public void setOutboundClientId(String outboundClientId) {
347 this.outboundClientId = outboundClientId;
348 }
349
350 /**
351 * @return the localClientId
352 */
353 public String getLocalClientId() {
354 return localClientId;
355 }
356
357 /**
358 * @param localClientId the localClientId to set
359 */
360 public void setLocalClientId(String localClientId) {
361 this.localClientId = localClientId;
362 }
363
364 /**
365 * @return the currently configured reconnection policy.
366 */
367 public ReconnectionPolicy getReconnectionPolicy() {
368 return this.policy;
369 }
370
371 /**
372 * @param policy The new reconnection policy this {@link JmsConnector} should use.
373 */
374 public void setReconnectionPolicy(ReconnectionPolicy policy) {
375 this.policy = policy;
376 }
377
378 /**
379 * @return the preferJndiDestinationLookup
380 */
381 public boolean isPreferJndiDestinationLookup() {
382 return preferJndiDestinationLookup;
383 }
384
385 /**
386 * Sets whether the connector should prefer to first try to find a destination in JNDI before
387 * using JMS semantics to create a Destination. By default the connector will first use JMS
388 * semantics and then fall-back to JNDI lookup, setting this value to true will reverse that
389 * ordering.
390 *
391 * @param preferJndiDestinationLookup the preferJndiDestinationLookup to set
392 */
393 public void setPreferJndiDestinationLookup(boolean preferJndiDestinationLookup) {
394 this.preferJndiDestinationLookup = preferJndiDestinationLookup;
395 }
396
397 /**
398 * @return returns true if the {@link JmsConnector} is connected to both brokers.
399 */
400 public boolean isConnected() {
401 return localConnection.get() != null && foreignConnection.get() != null;
402 }
403
404 protected void addInboundBridge(DestinationBridge bridge) {
405 if (!inboundBridges.contains(bridge)) {
406 inboundBridges.add(bridge);
407 }
408 }
409
410 protected void addOutboundBridge(DestinationBridge bridge) {
411 if (!outboundBridges.contains(bridge)) {
412 outboundBridges.add(bridge);
413 }
414 }
415
416 protected void removeInboundBridge(DestinationBridge bridge) {
417 inboundBridges.remove(bridge);
418 }
419
420 protected void removeOutboundBridge(DestinationBridge bridge) {
421 outboundBridges.remove(bridge);
422 }
423
424 public String getName() {
425 if (name == null) {
426 name = "Connector:" + getNextId();
427 }
428 return name;
429 }
430
431 public void setName(String name) {
432 this.name = name;
433 }
434
435 private static synchronized int getNextId() {
436 return nextId++;
437 }
438
439 public boolean isFailed() {
440 return this.failed.get();
441 }
442
443 /**
444 * Performs the work of connection to the local side of the Connection.
445 * <p>
446 * This creates the initial connection to the local end of the {@link JmsConnector}
447 * and then sets up all the destination bridges with the information needed to bridge
448 * on the local side of the connection.
449 *
450 * @throws Exception if the connection cannot be established for any reason.
451 */
452 protected abstract void initializeLocalConnection() throws Exception;
453
454 /**
455 * Performs the work of connection to the foreign side of the Connection.
456 * <p>
457 * This creates the initial connection to the foreign end of the {@link JmsConnector}
458 * and then sets up all the destination bridges with the information needed to bridge
459 * on the foreign side of the connection.
460 *
461 * @throws Exception if the connection cannot be established for any reason.
462 */
463 protected abstract void initializeForeignConnection() throws Exception;
464
465 /**
466 * Callback method that the Destination bridges can use to report an exception to occurs
467 * during normal bridging operations.
468 *
469 * @param connection
470 * The connection that was in use when the failure occured.
471 */
472 void handleConnectionFailure(Connection connection) {
473
474 // Can happen if async exception listener kicks in at the same time.
475 if (connection == null || !this.started.get()) {
476 return;
477 }
478
479 LOG.info("JmsConnector handling loss of connection [" + connection.toString() + "]");
480
481 // TODO - How do we handle the re-wiring of replyToBridges in this case.
482 replyToBridges.clear();
483
484 if (this.foreignConnection.compareAndSet((QueueConnection)connection, null)) {
485
486 // Stop the inbound bridges when the foreign connection is dropped since
487 // the bridge has no consumer and needs to be restarted once a new connection
488 // to the foreign side is made.
489 for (DestinationBridge bridge : inboundBridges) {
490 try {
491 bridge.stop();
492 } catch(Exception e) {
493 }
494 }
495
496 // We got here first and cleared the connection, now we queue a reconnect.
497 this.connectionSerivce.execute(new Runnable() {
498
499 @Override
500 public void run() {
501 try {
502 doInitializeConnection(false);
503 } catch (Exception e) {
504 LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
505 }
506 }
507 });
508
509 } else if (this.localConnection.compareAndSet((QueueConnection)connection, null)) {
510
511 // Stop the outbound bridges when the local connection is dropped since
512 // the bridge has no consumer and needs to be restarted once a new connection
513 // to the local side is made.
514 for (DestinationBridge bridge : outboundBridges) {
515 try {
516 bridge.stop();
517 } catch(Exception e) {
518 }
519 }
520
521 // We got here first and cleared the connection, now we queue a reconnect.
522 this.connectionSerivce.execute(new Runnable() {
523
524 @Override
525 public void run() {
526 try {
527 doInitializeConnection(true);
528 } catch (Exception e) {
529 LOG.error("Failed to initialize local connection for the JMSConnector", e);
530 }
531 }
532 });
533 }
534 }
535
536 private void scheduleAsyncLocalConnectionReconnect() {
537 this.connectionSerivce.execute(new Runnable() {
538 @Override
539 public void run() {
540 try {
541 doInitializeConnection(true);
542 } catch (Exception e) {
543 LOG.error("Failed to initialize local connection for the JMSConnector", e);
544 }
545 }
546 });
547 }
548
549 private void scheduleAsyncForeignConnectionReconnect() {
550 this.connectionSerivce.execute(new Runnable() {
551 @Override
552 public void run() {
553 try {
554 doInitializeConnection(false);
555 } catch (Exception e) {
556 LOG.error("Failed to initialize forgein connection for the JMSConnector", e);
557 }
558 }
559 });
560 }
561
562 private void doInitializeConnection(boolean local) throws Exception {
563
564 int attempt = 0;
565
566 final int maxRetries;
567 if (local) {
568 maxRetries = !localSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
569 policy.getMaxReconnectAttempts();
570 } else {
571 maxRetries = !foreignSideInitialized.get() ? policy.getMaxInitialConnectAttempts() :
572 policy.getMaxReconnectAttempts();
573 }
574
575 do
576 {
577 if (attempt > 0) {
578 try {
579 Thread.sleep(policy.getNextDelay(attempt));
580 } catch(InterruptedException e) {
581 }
582 }
583
584 if (connectionSerivce.isTerminating()) {
585 return;
586 }
587
588 try {
589
590 if (local) {
591 initializeLocalConnection();
592 localSideInitialized.set(true);
593 } else {
594 initializeForeignConnection();
595 foreignSideInitialized.set(true);
596 }
597
598 // Once we are connected we ensure all the bridges are started.
599 if (localConnection.get() != null && foreignConnection.get() != null) {
600 for (DestinationBridge bridge : inboundBridges) {
601 bridge.start();
602 }
603 for (DestinationBridge bridge : outboundBridges) {
604 bridge.start();
605 }
606 }
607
608 return;
609 } catch(Exception e) {
610 LOG.debug("Failed to establish initial " + (local ? "local" : "foriegn") +
611 " connection for JmsConnector [" + attempt + "]: " + e.getMessage());
612 }
613 }
614 while (maxRetries < ++attempt && !connectionSerivce.isTerminating());
615
616 this.failed.set(true);
617 }
618
619 private ThreadFactory factory = new ThreadFactory() {
620 public Thread newThread(Runnable runnable) {
621 Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: ");
622 thread.setDaemon(true);
623 return thread;
624 }
625 };
626
627 private ThreadPoolExecutor createExecutor() {
628 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, 2, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
629 exec.allowCoreThreadTimeOut(true);
630 return exec;
631 }
632 }