001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.failover;
018
019 import java.io.BufferedReader;
020 import java.io.FileReader;
021 import java.io.IOException;
022 import java.io.InputStreamReader;
023 import java.io.InterruptedIOException;
024 import java.net.InetAddress;
025 import java.net.MalformedURLException;
026 import java.net.URI;
027 import java.net.URL;
028 import java.util.ArrayList;
029 import java.util.Collections;
030 import java.util.HashSet;
031 import java.util.Iterator;
032 import java.util.LinkedHashMap;
033 import java.util.List;
034 import java.util.Map;
035 import java.util.StringTokenizer;
036 import java.util.concurrent.CopyOnWriteArrayList;
037 import java.util.concurrent.atomic.AtomicReference;
038
039 import org.apache.activemq.broker.SslContext;
040 import org.apache.activemq.command.Command;
041 import org.apache.activemq.command.ConnectionControl;
042 import org.apache.activemq.command.ConnectionId;
043 import org.apache.activemq.command.MessageDispatch;
044 import org.apache.activemq.command.MessagePull;
045 import org.apache.activemq.command.RemoveInfo;
046 import org.apache.activemq.command.Response;
047 import org.apache.activemq.state.ConnectionStateTracker;
048 import org.apache.activemq.state.Tracked;
049 import org.apache.activemq.thread.Task;
050 import org.apache.activemq.thread.TaskRunner;
051 import org.apache.activemq.thread.TaskRunnerFactory;
052 import org.apache.activemq.transport.CompositeTransport;
053 import org.apache.activemq.transport.DefaultTransportListener;
054 import org.apache.activemq.transport.FutureResponse;
055 import org.apache.activemq.transport.ResponseCallback;
056 import org.apache.activemq.transport.Transport;
057 import org.apache.activemq.transport.TransportFactory;
058 import org.apache.activemq.transport.TransportListener;
059 import org.apache.activemq.util.IOExceptionSupport;
060 import org.apache.activemq.util.ServiceSupport;
061 import org.slf4j.Logger;
062 import org.slf4j.LoggerFactory;
063
064 /**
065 * A Transport that is made reliable by being able to fail over to another
066 * transport when a transport failure is detected.
067 */
068 public class FailoverTransport implements CompositeTransport {
069
070 private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
071 private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
072 private static final int INFINITE = -1;
073 private TransportListener transportListener;
074 private boolean disposed;
075 private boolean connected;
076 private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
077 private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
078
079 private final Object reconnectMutex = new Object();
080 private final Object backupMutex = new Object();
081 private final Object sleepMutex = new Object();
082 private final Object listenerMutex = new Object();
083 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
084 private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
085
086 private URI connectedTransportURI;
087 private URI failedConnectTransportURI;
088 private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
089 private final TaskRunnerFactory reconnectTaskFactory;
090 private final TaskRunner reconnectTask;
091 private boolean started;
092 private boolean initialized;
093 private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
094 private long maxReconnectDelay = 1000 * 30;
095 private double backOffMultiplier = 2d;
096 private long timeout = INFINITE;
097 private boolean useExponentialBackOff = true;
098 private boolean randomize = true;
099 private int maxReconnectAttempts = INFINITE;
100 private int startupMaxReconnectAttempts = INFINITE;
101 private int connectFailures;
102 private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
103 private Exception connectionFailure;
104 private boolean firstConnection = true;
105 // optionally always have a backup created
106 private boolean backup = false;
107 private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
108 private int backupPoolSize = 1;
109 private boolean trackMessages = false;
110 private boolean trackTransactionProducers = true;
111 private int maxCacheSize = 128 * 1024;
112 private final TransportListener disposedListener = new DefaultTransportListener() {
113 };
114 private final TransportListener myTransportListener = createTransportListener();
115 private boolean updateURIsSupported = true;
116 private boolean reconnectSupported = true;
117 // remember for reconnect thread
118 private SslContext brokerSslContext;
119 private String updateURIsURL = null;
120 private boolean rebalanceUpdateURIs = true;
121 private boolean doRebalance = false;
122 private boolean connectedToPriority = false;
123
124 private boolean priorityBackup = false;
125 private ArrayList<URI> priorityList = new ArrayList<URI>();
126 private boolean priorityBackupAvailable = false;
127
128 public FailoverTransport() throws InterruptedIOException {
129 brokerSslContext = SslContext.getCurrentSslContext();
130 stateTracker.setTrackTransactions(true);
131 // Setup a task that is used to reconnect the a connection async.
132 reconnectTaskFactory = new TaskRunnerFactory();
133 reconnectTaskFactory.init();
134 reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
135 public boolean iterate() {
136 boolean result = false;
137 if (!started) {
138 return result;
139 }
140 boolean buildBackup = true;
141 synchronized (backupMutex) {
142 if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
143 result = doReconnect();
144 buildBackup = false;
145 connectedToPriority = isPriority(connectedTransportURI);
146 }
147 }
148 if (buildBackup) {
149 buildBackups();
150 if (priorityBackup && !connectedToPriority) {
151 try {
152 doDelay();
153 if (reconnectTask == null) {
154 return true;
155 }
156 reconnectTask.wakeup();
157 } catch (InterruptedException e) {
158 LOG.debug("Reconnect task has been interrupted.", e);
159 }
160 }
161 } else {
162 // build backups on the next iteration
163 buildBackup = true;
164 try {
165 if (reconnectTask == null) {
166 return true;
167 }
168 reconnectTask.wakeup();
169 } catch (InterruptedException e) {
170 LOG.debug("Reconnect task has been interrupted.", e);
171 }
172 }
173 return result;
174 }
175
176 }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
177 }
178
179 TransportListener createTransportListener() {
180 return new TransportListener() {
181 public void onCommand(Object o) {
182 Command command = (Command) o;
183 if (command == null) {
184 return;
185 }
186 if (command.isResponse()) {
187 Object object = null;
188 synchronized (requestMap) {
189 object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
190 }
191 if (object != null && object.getClass() == Tracked.class) {
192 ((Tracked) object).onResponses(command);
193 }
194 }
195 if (!initialized) {
196 initialized = true;
197 }
198
199 if (command.isConnectionControl()) {
200 handleConnectionControl((ConnectionControl) command);
201 }
202 if (transportListener != null) {
203 transportListener.onCommand(command);
204 }
205 }
206
207 public void onException(IOException error) {
208 try {
209 handleTransportFailure(error);
210 } catch (InterruptedException e) {
211 Thread.currentThread().interrupt();
212 transportListener.onException(new InterruptedIOException());
213 }
214 }
215
216 public void transportInterupted() {
217 if (transportListener != null) {
218 transportListener.transportInterupted();
219 }
220 }
221
222 public void transportResumed() {
223 if (transportListener != null) {
224 transportListener.transportResumed();
225 }
226 }
227 };
228 }
229
230 public final void disposeTransport(Transport transport) {
231 transport.setTransportListener(disposedListener);
232 ServiceSupport.dispose(transport);
233 }
234
235 public final void handleTransportFailure(IOException e) throws InterruptedException {
236 if (LOG.isTraceEnabled()) {
237 LOG.trace(this + " handleTransportFailure: " + e);
238 }
239 Transport transport = connectedTransport.getAndSet(null);
240 if (transport == null) {
241 // sync with possible in progress reconnect
242 synchronized (reconnectMutex) {
243 transport = connectedTransport.getAndSet(null);
244 }
245 }
246 if (transport != null) {
247
248 disposeTransport(transport);
249
250 boolean reconnectOk = false;
251 synchronized (reconnectMutex) {
252 if (canReconnect()) {
253 reconnectOk = true;
254 }
255 LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed, reason: " + e
256 + (reconnectOk ? "," : ", not") +" attempting to automatically reconnect");
257
258 initialized = false;
259 failedConnectTransportURI = connectedTransportURI;
260 connectedTransportURI = null;
261 connected = false;
262
263 // notify before any reconnect attempt so ack state can be whacked
264 if (transportListener != null) {
265 transportListener.transportInterupted();
266 }
267
268 if (reconnectOk) {
269 updated.remove(failedConnectTransportURI);
270 reconnectTask.wakeup();
271 } else if (!isDisposed()) {
272 propagateFailureToExceptionListener(e);
273 }
274 }
275 }
276 }
277
278 private boolean canReconnect() {
279 return started && 0 != calculateReconnectAttemptLimit();
280 }
281
282 public final void handleConnectionControl(ConnectionControl control) {
283 String reconnectStr = control.getReconnectTo();
284 if (reconnectStr != null) {
285 reconnectStr = reconnectStr.trim();
286 if (reconnectStr.length() > 0) {
287 try {
288 URI uri = new URI(reconnectStr);
289 if (isReconnectSupported()) {
290 reconnect(uri);
291 LOG.info("Reconnected to: " + uri);
292 }
293 } catch (Exception e) {
294 LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
295 }
296 }
297 }
298 processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
299 }
300
301 private final void processNewTransports(boolean rebalance, String newTransports) {
302 if (newTransports != null) {
303 newTransports = newTransports.trim();
304 if (newTransports.length() > 0 && isUpdateURIsSupported()) {
305 List<URI> list = new ArrayList<URI>();
306 StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
307 while (tokenizer.hasMoreTokens()) {
308 String str = tokenizer.nextToken();
309 try {
310 URI uri = new URI(str);
311 list.add(uri);
312 } catch (Exception e) {
313 LOG.error("Failed to parse broker address: " + str, e);
314 }
315 }
316 if (list.isEmpty() == false) {
317 try {
318 updateURIs(rebalance, list.toArray(new URI[list.size()]));
319 } catch (IOException e) {
320 LOG.error("Failed to update transport URI's from: " + newTransports, e);
321 }
322 }
323 }
324 }
325 }
326
327 public void start() throws Exception {
328 synchronized (reconnectMutex) {
329 if (LOG.isDebugEnabled()) {
330 LOG.debug("Started " + this);
331 }
332 if (started) {
333 return;
334 }
335 started = true;
336 stateTracker.setMaxCacheSize(getMaxCacheSize());
337 stateTracker.setTrackMessages(isTrackMessages());
338 stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
339 if (connectedTransport.get() != null) {
340 stateTracker.restore(connectedTransport.get());
341 } else {
342 reconnect(false);
343 }
344 }
345 }
346
347 public void stop() throws Exception {
348 Transport transportToStop = null;
349 List<Transport> backupsToStop = new ArrayList<Transport>(backups.size());
350
351 try {
352 synchronized (reconnectMutex) {
353 if (LOG.isDebugEnabled()) {
354 LOG.debug("Stopped " + this);
355 }
356 if (!started) {
357 return;
358 }
359 started = false;
360 disposed = true;
361 connected = false;
362
363 if (connectedTransport.get() != null) {
364 transportToStop = connectedTransport.getAndSet(null);
365 }
366 reconnectMutex.notifyAll();
367 }
368 synchronized (sleepMutex) {
369 sleepMutex.notifyAll();
370 }
371 } finally {
372 reconnectTask.shutdown();
373 reconnectTaskFactory.shutdownNow();
374 }
375
376 synchronized(backupMutex) {
377 for (BackupTransport backup : backups) {
378 backup.setDisposed(true);
379 Transport transport = backup.getTransport();
380 if (transport != null) {
381 transport.setTransportListener(disposedListener);
382 backupsToStop.add(transport);
383 }
384 }
385 backups.clear();
386 }
387 for (Transport transport : backupsToStop) {
388 try {
389 if (LOG.isTraceEnabled()) {
390 LOG.trace("Stopped backup: " + transport);
391 }
392 disposeTransport(transport);
393 } catch (Exception e) {
394 }
395 }
396 if (transportToStop != null) {
397 transportToStop.stop();
398 }
399 }
400
401 public long getInitialReconnectDelay() {
402 return initialReconnectDelay;
403 }
404
405 public void setInitialReconnectDelay(long initialReconnectDelay) {
406 this.initialReconnectDelay = initialReconnectDelay;
407 }
408
409 public long getMaxReconnectDelay() {
410 return maxReconnectDelay;
411 }
412
413 public void setMaxReconnectDelay(long maxReconnectDelay) {
414 this.maxReconnectDelay = maxReconnectDelay;
415 }
416
417 public long getReconnectDelay() {
418 return reconnectDelay;
419 }
420
421 public void setReconnectDelay(long reconnectDelay) {
422 this.reconnectDelay = reconnectDelay;
423 }
424
425 public double getReconnectDelayExponent() {
426 return backOffMultiplier;
427 }
428
429 public void setReconnectDelayExponent(double reconnectDelayExponent) {
430 this.backOffMultiplier = reconnectDelayExponent;
431 }
432
433 public Transport getConnectedTransport() {
434 return connectedTransport.get();
435 }
436
437 public URI getConnectedTransportURI() {
438 return connectedTransportURI;
439 }
440
441 public int getMaxReconnectAttempts() {
442 return maxReconnectAttempts;
443 }
444
445 public void setMaxReconnectAttempts(int maxReconnectAttempts) {
446 this.maxReconnectAttempts = maxReconnectAttempts;
447 }
448
449 public int getStartupMaxReconnectAttempts() {
450 return this.startupMaxReconnectAttempts;
451 }
452
453 public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
454 this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
455 }
456
457 public long getTimeout() {
458 return timeout;
459 }
460
461 public void setTimeout(long timeout) {
462 this.timeout = timeout;
463 }
464
465 /**
466 * @return Returns the randomize.
467 */
468 public boolean isRandomize() {
469 return randomize;
470 }
471
472 /**
473 * @param randomize The randomize to set.
474 */
475 public void setRandomize(boolean randomize) {
476 this.randomize = randomize;
477 }
478
479 public boolean isBackup() {
480 return backup;
481 }
482
483 public void setBackup(boolean backup) {
484 this.backup = backup;
485 }
486
487 public int getBackupPoolSize() {
488 return backupPoolSize;
489 }
490
491 public void setBackupPoolSize(int backupPoolSize) {
492 this.backupPoolSize = backupPoolSize;
493 }
494
495 public int getCurrentBackups() {
496 return this.backups.size();
497 }
498
499 public boolean isTrackMessages() {
500 return trackMessages;
501 }
502
503 public void setTrackMessages(boolean trackMessages) {
504 this.trackMessages = trackMessages;
505 }
506
507 public boolean isTrackTransactionProducers() {
508 return this.trackTransactionProducers;
509 }
510
511 public void setTrackTransactionProducers(boolean trackTransactionProducers) {
512 this.trackTransactionProducers = trackTransactionProducers;
513 }
514
515 public int getMaxCacheSize() {
516 return maxCacheSize;
517 }
518
519 public void setMaxCacheSize(int maxCacheSize) {
520 this.maxCacheSize = maxCacheSize;
521 }
522
523 public boolean isPriorityBackup() {
524 return priorityBackup;
525 }
526
527 public void setPriorityBackup(boolean priorityBackup) {
528 this.priorityBackup = priorityBackup;
529 }
530
531 public void setPriorityURIs(String priorityURIs) {
532 StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
533 while (tokenizer.hasMoreTokens()) {
534 String str = tokenizer.nextToken();
535 try {
536 URI uri = new URI(str);
537 priorityList.add(uri);
538 } catch (Exception e) {
539 LOG.error("Failed to parse broker address: " + str, e);
540 }
541 }
542 }
543
544 public void oneway(Object o) throws IOException {
545
546 Command command = (Command) o;
547 Exception error = null;
548 try {
549
550 synchronized (reconnectMutex) {
551
552 if (command != null && connectedTransport.get() == null) {
553 if (command.isShutdownInfo()) {
554 // Skipping send of ShutdownInfo command when not connected.
555 return;
556 } else if (command instanceof RemoveInfo || command.isMessageAck()) {
557 // Simulate response to RemoveInfo command or MessageAck (as it will be stale)
558 stateTracker.track(command);
559 if (command.isResponseRequired()) {
560 Response response = new Response();
561 response.setCorrelationId(command.getCommandId());
562 myTransportListener.onCommand(response);
563 }
564 return;
565 } else if (command instanceof MessagePull) {
566 // Simulate response to MessagePull if timed as we can't honor that now.
567 MessagePull pullRequest = (MessagePull) command;
568 if (pullRequest.getTimeout() != 0) {
569 MessageDispatch dispatch = new MessageDispatch();
570 dispatch.setConsumerId(pullRequest.getConsumerId());
571 dispatch.setDestination(pullRequest.getDestination());
572 myTransportListener.onCommand(dispatch);
573 }
574 return;
575 }
576 }
577
578 // Keep trying until the message is sent.
579 for (int i = 0; !disposed; i++) {
580 try {
581
582 // Wait for transport to be connected.
583 Transport transport = connectedTransport.get();
584 long start = System.currentTimeMillis();
585 boolean timedout = false;
586 while (transport == null && !disposed && connectionFailure == null
587 && !Thread.currentThread().isInterrupted()) {
588 if (LOG.isTraceEnabled()) {
589 LOG.trace("Waiting for transport to reconnect..: " + command);
590 }
591 long end = System.currentTimeMillis();
592 if (timeout > 0 && (end - start > timeout)) {
593 timedout = true;
594 if (LOG.isInfoEnabled()) {
595 LOG.info("Failover timed out after " + (end - start) + "ms");
596 }
597 break;
598 }
599 try {
600 reconnectMutex.wait(100);
601 } catch (InterruptedException e) {
602 Thread.currentThread().interrupt();
603 if (LOG.isDebugEnabled()) {
604 LOG.debug("Interupted: " + e, e);
605 }
606 }
607 transport = connectedTransport.get();
608 }
609
610 if (transport == null) {
611 // Previous loop may have exited due to use being
612 // disposed.
613 if (disposed) {
614 error = new IOException("Transport disposed.");
615 } else if (connectionFailure != null) {
616 error = connectionFailure;
617 } else if (timedout == true) {
618 error = new IOException("Failover timeout of " + timeout + " ms reached.");
619 } else {
620 error = new IOException("Unexpected failure.");
621 }
622 break;
623 }
624
625 // If it was a request and it was not being tracked by
626 // the state tracker,
627 // then hold it in the requestMap so that we can replay
628 // it later.
629 Tracked tracked = stateTracker.track(command);
630 synchronized (requestMap) {
631 if (tracked != null && tracked.isWaitingForResponse()) {
632 requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
633 } else if (tracked == null && command.isResponseRequired()) {
634 requestMap.put(Integer.valueOf(command.getCommandId()), command);
635 }
636 }
637
638 // Send the message.
639 try {
640 transport.oneway(command);
641 stateTracker.trackBack(command);
642 } catch (IOException e) {
643
644 // If the command was not tracked.. we will retry in
645 // this method
646 if (tracked == null) {
647
648 // since we will retry in this method.. take it
649 // out of the request
650 // map so that it is not sent 2 times on
651 // recovery
652 if (command.isResponseRequired()) {
653 requestMap.remove(Integer.valueOf(command.getCommandId()));
654 }
655
656 // Rethrow the exception so it will handled by
657 // the outer catch
658 throw e;
659 } else {
660 // Handle the error but allow the method to return since the
661 // tracked commands are replayed on reconnect.
662 if (LOG.isDebugEnabled()) {
663 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
664 }
665 handleTransportFailure(e);
666 }
667 }
668
669 return;
670
671 } catch (IOException e) {
672 if (LOG.isDebugEnabled()) {
673 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
674 }
675 handleTransportFailure(e);
676 }
677 }
678 }
679 } catch (InterruptedException e) {
680 // Some one may be trying to stop our thread.
681 Thread.currentThread().interrupt();
682 throw new InterruptedIOException();
683 }
684
685 if (!disposed) {
686 if (error != null) {
687 if (error instanceof IOException) {
688 throw (IOException) error;
689 }
690 throw IOExceptionSupport.create(error);
691 }
692 }
693 }
694
695 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
696 throw new AssertionError("Unsupported Method");
697 }
698
699 public Object request(Object command) throws IOException {
700 throw new AssertionError("Unsupported Method");
701 }
702
703 public Object request(Object command, int timeout) throws IOException {
704 throw new AssertionError("Unsupported Method");
705 }
706
707 public void add(boolean rebalance, URI u[]) {
708 boolean newURI = false;
709 for (URI uri : u) {
710 if (!contains(uri)) {
711 uris.add(uri);
712 newURI = true;
713 }
714 }
715 if (newURI) {
716 reconnect(rebalance);
717 }
718 }
719
720 public void remove(boolean rebalance, URI u[]) {
721 for (URI uri : u) {
722 uris.remove(uri);
723 }
724 // rebalance is automatic if any connected to removed/stopped broker
725 }
726
727 public void add(boolean rebalance, String u) {
728 try {
729 URI newURI = new URI(u);
730 if (contains(newURI) == false) {
731 uris.add(newURI);
732 reconnect(rebalance);
733 }
734
735 } catch (Exception e) {
736 LOG.error("Failed to parse URI: " + u);
737 }
738 }
739
740 public void reconnect(boolean rebalance) {
741 synchronized (reconnectMutex) {
742 if (started) {
743 if (rebalance) {
744 doRebalance = true;
745 }
746 LOG.debug("Waking up reconnect task");
747 try {
748 reconnectTask.wakeup();
749 } catch (InterruptedException e) {
750 Thread.currentThread().interrupt();
751 }
752 } else {
753 LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
754 }
755 }
756 }
757
758 private List<URI> getConnectList() {
759 if (!updated.isEmpty()) {
760 if (failedConnectTransportURI != null) {
761 boolean removed = updated.remove(failedConnectTransportURI);
762 if (removed) {
763 updated.add(failedConnectTransportURI);
764 }
765 }
766 return updated;
767 }
768 ArrayList<URI> l = new ArrayList<URI>(uris);
769 boolean removed = false;
770 if (failedConnectTransportURI != null) {
771 removed = l.remove(failedConnectTransportURI);
772 }
773 if (randomize) {
774 // Randomly, reorder the list by random swapping
775 for (int i = 0; i < l.size(); i++) {
776 int p = (int) (Math.random() * 100 % l.size());
777 URI t = l.get(p);
778 l.set(p, l.get(i));
779 l.set(i, t);
780 }
781 }
782 if (removed) {
783 l.add(failedConnectTransportURI);
784 }
785 if (LOG.isDebugEnabled()) {
786 LOG.debug("urlList connectionList:" + l + ", from: " + uris);
787 }
788 return l;
789 }
790
791 public TransportListener getTransportListener() {
792 return transportListener;
793 }
794
795 public void setTransportListener(TransportListener commandListener) {
796 synchronized (listenerMutex) {
797 this.transportListener = commandListener;
798 listenerMutex.notifyAll();
799 }
800 }
801
802 public <T> T narrow(Class<T> target) {
803
804 if (target.isAssignableFrom(getClass())) {
805 return target.cast(this);
806 }
807 Transport transport = connectedTransport.get();
808 if (transport != null) {
809 return transport.narrow(target);
810 }
811 return null;
812
813 }
814
815 protected void restoreTransport(Transport t) throws Exception, IOException {
816 t.start();
817 // send information to the broker - informing it we are an ft client
818 ConnectionControl cc = new ConnectionControl();
819 cc.setFaultTolerant(true);
820 t.oneway(cc);
821 stateTracker.restore(t);
822 Map<Integer, Command> tmpMap = null;
823 synchronized (requestMap) {
824 tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
825 }
826 for (Command command : tmpMap.values()) {
827 if (LOG.isTraceEnabled()) {
828 LOG.trace("restore requestMap, replay: " + command);
829 }
830 t.oneway(command);
831 }
832 }
833
834 public boolean isUseExponentialBackOff() {
835 return useExponentialBackOff;
836 }
837
838 public void setUseExponentialBackOff(boolean useExponentialBackOff) {
839 this.useExponentialBackOff = useExponentialBackOff;
840 }
841
842 @Override
843 public String toString() {
844 return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
845 }
846
847 public String getRemoteAddress() {
848 Transport transport = connectedTransport.get();
849 if (transport != null) {
850 return transport.getRemoteAddress();
851 }
852 return null;
853 }
854
855 public boolean isFaultTolerant() {
856 return true;
857 }
858
859 private void doUpdateURIsFromDisk() {
860 // If updateURIsURL is specified, read the file and add any new
861 // transport URI's to this FailOverTransport.
862 // Note: Could track file timestamp to avoid unnecessary reading.
863 String fileURL = getUpdateURIsURL();
864 if (fileURL != null) {
865 BufferedReader in = null;
866 String newUris = null;
867 StringBuffer buffer = new StringBuffer();
868
869 try {
870 in = new BufferedReader(getURLStream(fileURL));
871 while (true) {
872 String line = in.readLine();
873 if (line == null) {
874 break;
875 }
876 buffer.append(line);
877 }
878 newUris = buffer.toString();
879 } catch (IOException ioe) {
880 LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
881 } finally {
882 if (in != null) {
883 try {
884 in.close();
885 } catch (IOException ioe) {
886 // ignore
887 }
888 }
889 }
890
891 processNewTransports(isRebalanceUpdateURIs(), newUris);
892 }
893 }
894
895 final boolean doReconnect() {
896 Exception failure = null;
897 synchronized (reconnectMutex) {
898
899 // First ensure we are up to date.
900 doUpdateURIsFromDisk();
901
902 if (disposed || connectionFailure != null) {
903 reconnectMutex.notifyAll();
904 }
905 if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
906 return false;
907 } else {
908 List<URI> connectList = getConnectList();
909 if (connectList.isEmpty()) {
910 failure = new IOException("No uris available to connect to.");
911 } else {
912 if (doRebalance) {
913 if (connectList.get(0).equals(connectedTransportURI)) {
914 // already connected to first in the list, no need to rebalance
915 doRebalance = false;
916 return false;
917 } else {
918 if (LOG.isDebugEnabled()) {
919 LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
920 }
921 try {
922 Transport transport = this.connectedTransport.getAndSet(null);
923 if (transport != null) {
924 disposeTransport(transport);
925 }
926 } catch (Exception e) {
927 if (LOG.isDebugEnabled()) {
928 LOG.debug("Caught an exception stopping existing transport for rebalance", e);
929 }
930 }
931 }
932 doRebalance = false;
933 }
934
935 resetReconnectDelay();
936
937 Transport transport = null;
938 URI uri = null;
939
940 // If we have a backup already waiting lets try it.
941 synchronized (backupMutex) {
942 if ((priorityBackup || backup) && !backups.isEmpty()) {
943 ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups);
944 if (randomize) {
945 Collections.shuffle(l);
946 }
947 BackupTransport bt = l.remove(0);
948 backups.remove(bt);
949 transport = bt.getTransport();
950 uri = bt.getUri();
951 if (priorityBackup && priorityBackupAvailable) {
952 Transport old = this.connectedTransport.getAndSet(null);
953 if (transport != null) {
954 disposeTransport(old);
955 }
956 priorityBackupAvailable = false;
957 }
958 }
959 }
960
961 // Sleep for the reconnectDelay if there's no backup and we aren't trying
962 // for the first time, or we were disposed for some reason.
963 if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) {
964 synchronized (sleepMutex) {
965 if (LOG.isDebugEnabled()) {
966 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
967 }
968 try {
969 sleepMutex.wait(reconnectDelay);
970 } catch (InterruptedException e) {
971 Thread.currentThread().interrupt();
972 }
973 }
974 }
975
976 Iterator<URI> iter = connectList.iterator();
977 while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
978
979 try {
980 SslContext.setCurrentSslContext(brokerSslContext);
981
982 // We could be starting with a backup and if so we wait to grab a
983 // URI from the pool until next time around.
984 if (transport == null) {
985 uri = iter.next();
986 transport = TransportFactory.compositeConnect(uri);
987 }
988
989 if (LOG.isDebugEnabled()) {
990 LOG.debug("Attempting " + connectFailures + "th connect to: " + uri);
991 }
992 transport.setTransportListener(myTransportListener);
993 transport.start();
994
995 if (started && !firstConnection) {
996 restoreTransport(transport);
997 }
998
999 if (LOG.isDebugEnabled()) {
1000 LOG.debug("Connection established");
1001 }
1002 reconnectDelay = initialReconnectDelay;
1003 connectedTransportURI = uri;
1004 connectedTransport.set(transport);
1005 reconnectMutex.notifyAll();
1006 connectFailures = 0;
1007
1008 // Make sure on initial startup, that the transportListener
1009 // has been initialized for this instance.
1010 synchronized (listenerMutex) {
1011 if (transportListener == null) {
1012 try {
1013 // if it isn't set after 2secs - it probably never will be
1014 listenerMutex.wait(2000);
1015 } catch (InterruptedException ex) {
1016 }
1017 }
1018 }
1019
1020 if (transportListener != null) {
1021 transportListener.transportResumed();
1022 } else {
1023 if (LOG.isDebugEnabled()) {
1024 LOG.debug("transport resumed by transport listener not set");
1025 }
1026 }
1027
1028 if (firstConnection) {
1029 firstConnection = false;
1030 LOG.info("Successfully connected to " + uri);
1031 } else {
1032 LOG.info("Successfully reconnected to " + uri);
1033 }
1034
1035 connected = true;
1036 return false;
1037 } catch (Exception e) {
1038 failure = e;
1039 if (LOG.isDebugEnabled()) {
1040 LOG.debug("Connect fail to: " + uri + ", reason: " + e);
1041 }
1042 if (transport != null) {
1043 try {
1044 transport.stop();
1045 transport = null;
1046 } catch (Exception ee) {
1047 if (LOG.isDebugEnabled()) {
1048 LOG.debug("Stop of failed transport: " + transport +
1049 " failed with reason: " + ee);
1050 }
1051 }
1052 }
1053 } finally {
1054 SslContext.setCurrentSslContext(null);
1055 }
1056 }
1057 }
1058 }
1059
1060 int reconnectLimit = calculateReconnectAttemptLimit();
1061
1062 connectFailures++;
1063 if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
1064 LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
1065 connectionFailure = failure;
1066
1067 // Make sure on initial startup, that the transportListener has been
1068 // initialized for this instance.
1069 synchronized (listenerMutex) {
1070 if (transportListener == null) {
1071 try {
1072 listenerMutex.wait(2000);
1073 } catch (InterruptedException ex) {
1074 }
1075 }
1076 }
1077
1078 propagateFailureToExceptionListener(connectionFailure);
1079 return false;
1080 }
1081 }
1082
1083 if (!disposed) {
1084 doDelay();
1085 }
1086
1087 return !disposed;
1088 }
1089
1090 private void doDelay() {
1091 if (reconnectDelay > 0) {
1092 synchronized (sleepMutex) {
1093 if (LOG.isDebugEnabled()) {
1094 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
1095 }
1096 try {
1097 sleepMutex.wait(reconnectDelay);
1098 } catch (InterruptedException e) {
1099 Thread.currentThread().interrupt();
1100 }
1101 }
1102 }
1103
1104 if (useExponentialBackOff) {
1105 // Exponential increment of reconnect delay.
1106 reconnectDelay *= backOffMultiplier;
1107 if (reconnectDelay > maxReconnectDelay) {
1108 reconnectDelay = maxReconnectDelay;
1109 }
1110 }
1111 }
1112
1113 private void resetReconnectDelay() {
1114 if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
1115 reconnectDelay = initialReconnectDelay;
1116 }
1117 }
1118
1119 /*
1120 * called with reconnectMutex held
1121 */
1122 private void propagateFailureToExceptionListener(Exception exception) {
1123 if (transportListener != null) {
1124 if (exception instanceof IOException) {
1125 transportListener.onException((IOException)exception);
1126 } else {
1127 transportListener.onException(IOExceptionSupport.create(exception));
1128 }
1129 }
1130 reconnectMutex.notifyAll();
1131 }
1132
1133 private int calculateReconnectAttemptLimit() {
1134 int maxReconnectValue = this.maxReconnectAttempts;
1135 if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {
1136 maxReconnectValue = this.startupMaxReconnectAttempts;
1137 }
1138 return maxReconnectValue;
1139 }
1140
1141 final boolean buildBackups() {
1142 synchronized (backupMutex) {
1143 if (!disposed && (backup || priorityBackup) && backups.size() < backupPoolSize) {
1144 ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
1145 List<URI> connectList = getConnectList();
1146 for (URI uri: connectList) {
1147 if (!backupList.contains(uri)) {
1148 backupList.add(uri);
1149 }
1150 }
1151 // removed disposed backups
1152 List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
1153 for (BackupTransport bt : backups) {
1154 if (bt.isDisposed()) {
1155 disposedList.add(bt);
1156 }
1157 }
1158 backups.removeAll(disposedList);
1159 disposedList.clear();
1160 for (Iterator<URI> iter = backupList.iterator(); !disposed && iter.hasNext() && backups.size() < backupPoolSize; ) {
1161 URI uri = iter.next();
1162 if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
1163 try {
1164 SslContext.setCurrentSslContext(brokerSslContext);
1165 BackupTransport bt = new BackupTransport(this);
1166 bt.setUri(uri);
1167 if (!backups.contains(bt)) {
1168 Transport t = TransportFactory.compositeConnect(uri);
1169 t.setTransportListener(bt);
1170 t.start();
1171 bt.setTransport(t);
1172 backups.add(bt);
1173 if (priorityBackup && isPriority(uri)) {
1174 priorityBackupAvailable = true;
1175 }
1176 }
1177 } catch (Exception e) {
1178 LOG.debug("Failed to build backup ", e);
1179 } finally {
1180 SslContext.setCurrentSslContext(null);
1181 }
1182 }
1183 }
1184 }
1185 }
1186 return false;
1187 }
1188
1189 protected boolean isPriority(URI uri) {
1190 if (!priorityList.isEmpty()) {
1191 return priorityList.contains(uri);
1192 }
1193 return uris.indexOf(uri) == 0;
1194 }
1195
1196 public boolean isDisposed() {
1197 return disposed;
1198 }
1199
1200 public boolean isConnected() {
1201 return connected;
1202 }
1203
1204 public void reconnect(URI uri) throws IOException {
1205 add(true, new URI[]{uri});
1206 }
1207
1208 public boolean isReconnectSupported() {
1209 return this.reconnectSupported;
1210 }
1211
1212 public void setReconnectSupported(boolean value) {
1213 this.reconnectSupported = value;
1214 }
1215
1216 public boolean isUpdateURIsSupported() {
1217 return this.updateURIsSupported;
1218 }
1219
1220 public void setUpdateURIsSupported(boolean value) {
1221 this.updateURIsSupported = value;
1222 }
1223
1224 public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
1225 if (isUpdateURIsSupported()) {
1226 HashSet<URI> copy = new HashSet<URI>(this.updated);
1227 updated.clear();
1228 if (updatedURIs != null && updatedURIs.length > 0) {
1229 for (URI uri : updatedURIs) {
1230 if (uri != null && !updated.contains(uri)) {
1231 updated.add(uri);
1232 }
1233 }
1234 if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) {
1235 buildBackups();
1236 synchronized (reconnectMutex) {
1237 reconnect(rebalance);
1238 }
1239 }
1240 }
1241 }
1242 }
1243
1244 /**
1245 * @return the updateURIsURL
1246 */
1247 public String getUpdateURIsURL() {
1248 return this.updateURIsURL;
1249 }
1250
1251 /**
1252 * @param updateURIsURL the updateURIsURL to set
1253 */
1254 public void setUpdateURIsURL(String updateURIsURL) {
1255 this.updateURIsURL = updateURIsURL;
1256 }
1257
1258 /**
1259 * @return the rebalanceUpdateURIs
1260 */
1261 public boolean isRebalanceUpdateURIs() {
1262 return this.rebalanceUpdateURIs;
1263 }
1264
1265 /**
1266 * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
1267 */
1268 public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
1269 this.rebalanceUpdateURIs = rebalanceUpdateURIs;
1270 }
1271
1272 public int getReceiveCounter() {
1273 Transport transport = connectedTransport.get();
1274 if (transport == null) {
1275 return 0;
1276 }
1277 return transport.getReceiveCounter();
1278 }
1279
1280 public int getConnectFailures() {
1281 return connectFailures;
1282 }
1283
1284 public void connectionInterruptProcessingComplete(ConnectionId connectionId) {
1285 synchronized (reconnectMutex) {
1286 stateTracker.connectionInterruptProcessingComplete(this, connectionId);
1287 }
1288 }
1289
1290 public ConnectionStateTracker getStateTracker() {
1291 return stateTracker;
1292 }
1293
1294 private boolean contains(URI newURI) {
1295 boolean result = false;
1296 for (URI uri : uris) {
1297 if (newURI.getPort() == uri.getPort()) {
1298 InetAddress newAddr = null;
1299 InetAddress addr = null;
1300 try {
1301 newAddr = InetAddress.getByName(newURI.getHost());
1302 addr = InetAddress.getByName(uri.getHost());
1303 } catch(IOException e) {
1304
1305 if (newAddr == null) {
1306 LOG.error("Failed to Lookup INetAddress for URI[ " + newURI + " ] : " + e);
1307 } else {
1308 LOG.error("Failed to Lookup INetAddress for URI[ " + uri + " ] : " + e);
1309 }
1310
1311 if (newURI.getHost().equalsIgnoreCase(uri.getHost())) {
1312 result = true;
1313 break;
1314 } else {
1315 continue;
1316 }
1317 }
1318
1319 if (addr.equals(newAddr)) {
1320 result = true;
1321 break;
1322 }
1323 }
1324 }
1325
1326 return result;
1327 }
1328
1329 private InputStreamReader getURLStream(String path) throws IOException {
1330 InputStreamReader result = null;
1331 URL url = null;
1332 try {
1333 url = new URL(path);
1334 result = new InputStreamReader(url.openStream());
1335 } catch (MalformedURLException e) {
1336 // ignore - it could be a path to a a local file
1337 }
1338 if (result == null) {
1339 result = new FileReader(path);
1340 }
1341 return result;
1342 }
1343 }