001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.failover;
018    
019    import java.io.BufferedReader;
020    import java.io.FileReader;
021    import java.io.IOException;
022    import java.io.InputStreamReader;
023    import java.io.InterruptedIOException;
024    import java.net.InetAddress;
025    import java.net.MalformedURLException;
026    import java.net.URI;
027    import java.net.URL;
028    import java.util.ArrayList;
029    import java.util.Collections;
030    import java.util.HashSet;
031    import java.util.Iterator;
032    import java.util.LinkedHashMap;
033    import java.util.List;
034    import java.util.Map;
035    import java.util.StringTokenizer;
036    import java.util.concurrent.CopyOnWriteArrayList;
037    import java.util.concurrent.atomic.AtomicReference;
038    
039    import org.apache.activemq.broker.SslContext;
040    import org.apache.activemq.command.Command;
041    import org.apache.activemq.command.ConnectionControl;
042    import org.apache.activemq.command.ConnectionId;
043    import org.apache.activemq.command.MessageDispatch;
044    import org.apache.activemq.command.MessagePull;
045    import org.apache.activemq.command.RemoveInfo;
046    import org.apache.activemq.command.Response;
047    import org.apache.activemq.state.ConnectionStateTracker;
048    import org.apache.activemq.state.Tracked;
049    import org.apache.activemq.thread.Task;
050    import org.apache.activemq.thread.TaskRunner;
051    import org.apache.activemq.thread.TaskRunnerFactory;
052    import org.apache.activemq.transport.CompositeTransport;
053    import org.apache.activemq.transport.DefaultTransportListener;
054    import org.apache.activemq.transport.FutureResponse;
055    import org.apache.activemq.transport.ResponseCallback;
056    import org.apache.activemq.transport.Transport;
057    import org.apache.activemq.transport.TransportFactory;
058    import org.apache.activemq.transport.TransportListener;
059    import org.apache.activemq.util.IOExceptionSupport;
060    import org.apache.activemq.util.ServiceSupport;
061    import org.slf4j.Logger;
062    import org.slf4j.LoggerFactory;
063    
064    /**
065     * A Transport that is made reliable by being able to fail over to another
066     * transport when a transport failure is detected.
067     */
068    public class FailoverTransport implements CompositeTransport {
069    
070        private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
071        private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
072        private static final int INFINITE = -1;
073        private TransportListener transportListener;
074        private boolean disposed;
075        private boolean connected;
076        private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
077        private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
078    
079        private final Object reconnectMutex = new Object();
080        private final Object backupMutex = new Object();
081        private final Object sleepMutex = new Object();
082        private final Object listenerMutex = new Object();
083        private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
084        private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
085    
086        private URI connectedTransportURI;
087        private URI failedConnectTransportURI;
088        private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
089        private final TaskRunnerFactory reconnectTaskFactory;
090        private final TaskRunner reconnectTask;
091        private boolean started;
092        private boolean initialized;
093        private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
094        private long maxReconnectDelay = 1000 * 30;
095        private double backOffMultiplier = 2d;
096        private long timeout = INFINITE;
097        private boolean useExponentialBackOff = true;
098        private boolean randomize = true;
099        private int maxReconnectAttempts = INFINITE;
100        private int startupMaxReconnectAttempts = INFINITE;
101        private int connectFailures;
102        private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
103        private Exception connectionFailure;
104        private boolean firstConnection = true;
105        // optionally always have a backup created
106        private boolean backup = false;
107        private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
108        private int backupPoolSize = 1;
109        private boolean trackMessages = false;
110        private boolean trackTransactionProducers = true;
111        private int maxCacheSize = 128 * 1024;
112        private final TransportListener disposedListener = new DefaultTransportListener() {
113        };
114        private final TransportListener myTransportListener = createTransportListener();
115        private boolean updateURIsSupported = true;
116        private boolean reconnectSupported = true;
117        // remember for reconnect thread
118        private SslContext brokerSslContext;
119        private String updateURIsURL = null;
120        private boolean rebalanceUpdateURIs = true;
121        private boolean doRebalance = false;
122        private boolean connectedToPriority = false;
123    
124        private boolean priorityBackup = false;
125        private ArrayList<URI> priorityList = new ArrayList<URI>();
126        private boolean priorityBackupAvailable = false;
127    
128        public FailoverTransport() throws InterruptedIOException {
129            brokerSslContext = SslContext.getCurrentSslContext();
130            stateTracker.setTrackTransactions(true);
131            // Setup a task that is used to reconnect the a connection async.
132            reconnectTaskFactory = new TaskRunnerFactory();
133            reconnectTaskFactory.init();
134            reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
135                public boolean iterate() {
136                    boolean result = false;
137                    if (!started) {
138                        return result;
139                    }
140                    boolean buildBackup = true;
141                    synchronized (backupMutex) {
142                        if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
143                            result = doReconnect();
144                            buildBackup = false;
145                            connectedToPriority = isPriority(connectedTransportURI);
146                        }
147                    }
148                    if (buildBackup) {
149                        buildBackups();
150                        if (priorityBackup && !connectedToPriority) {
151                            try {
152                                doDelay();
153                                if (reconnectTask == null) {
154                                    return true;
155                                }
156                                reconnectTask.wakeup();
157                            } catch (InterruptedException e) {
158                                LOG.debug("Reconnect task has been interrupted.", e);
159                            }
160                        }
161                    } else {
162                        // build backups on the next iteration
163                        buildBackup = true;
164                        try {
165                            if (reconnectTask == null) {
166                                return true;
167                            }
168                            reconnectTask.wakeup();
169                        } catch (InterruptedException e) {
170                            LOG.debug("Reconnect task has been interrupted.", e);
171                        }
172                    }
173                    return result;
174                }
175    
176            }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
177        }
178    
179        TransportListener createTransportListener() {
180            return new TransportListener() {
181                public void onCommand(Object o) {
182                    Command command = (Command) o;
183                    if (command == null) {
184                        return;
185                    }
186                    if (command.isResponse()) {
187                        Object object = null;
188                        synchronized (requestMap) {
189                            object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
190                        }
191                        if (object != null && object.getClass() == Tracked.class) {
192                            ((Tracked) object).onResponses(command);
193                        }
194                    }
195                    if (!initialized) {
196                        initialized = true;
197                    }
198    
199                    if (command.isConnectionControl()) {
200                        handleConnectionControl((ConnectionControl) command);
201                    }
202                    if (transportListener != null) {
203                        transportListener.onCommand(command);
204                    }
205                }
206    
207                public void onException(IOException error) {
208                    try {
209                        handleTransportFailure(error);
210                    } catch (InterruptedException e) {
211                        Thread.currentThread().interrupt();
212                        transportListener.onException(new InterruptedIOException());
213                    }
214                }
215    
216                public void transportInterupted() {
217                    if (transportListener != null) {
218                        transportListener.transportInterupted();
219                    }
220                }
221    
222                public void transportResumed() {
223                    if (transportListener != null) {
224                        transportListener.transportResumed();
225                    }
226                }
227            };
228        }
229    
230        public final void disposeTransport(Transport transport) {
231            transport.setTransportListener(disposedListener);
232            ServiceSupport.dispose(transport);
233        }
234    
235        public final void handleTransportFailure(IOException e) throws InterruptedException {
236            if (LOG.isTraceEnabled()) {
237                LOG.trace(this + " handleTransportFailure: " + e);
238            }
239            Transport transport = connectedTransport.getAndSet(null);
240            if (transport == null) {
241                // sync with possible in progress reconnect
242                synchronized (reconnectMutex) {
243                    transport = connectedTransport.getAndSet(null);
244                }
245            }
246            if (transport != null) {
247    
248                disposeTransport(transport);
249    
250                boolean reconnectOk = false;
251                synchronized (reconnectMutex) {
252                    if (canReconnect()) {
253                        reconnectOk = true;
254                    }
255                    LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed, reason:  " + e
256                            + (reconnectOk ? "," : ", not")  +" attempting to automatically reconnect");
257    
258                    initialized = false;
259                    failedConnectTransportURI = connectedTransportURI;
260                    connectedTransportURI = null;
261                    connected = false;
262    
263                    // notify before any reconnect attempt so ack state can be whacked
264                    if (transportListener != null) {
265                        transportListener.transportInterupted();
266                    }
267    
268                    if (reconnectOk) {
269                        updated.remove(failedConnectTransportURI);
270                        reconnectTask.wakeup();
271                    } else if (!isDisposed()) {
272                        propagateFailureToExceptionListener(e);
273                    }
274                }
275            }
276        }
277    
278        private boolean canReconnect() {
279            return started && 0 != calculateReconnectAttemptLimit();
280        }
281    
282        public final void handleConnectionControl(ConnectionControl control) {
283            String reconnectStr = control.getReconnectTo();
284            if (reconnectStr != null) {
285                reconnectStr = reconnectStr.trim();
286                if (reconnectStr.length() > 0) {
287                    try {
288                        URI uri = new URI(reconnectStr);
289                        if (isReconnectSupported()) {
290                            reconnect(uri);
291                            LOG.info("Reconnected to: " + uri);
292                        }
293                    } catch (Exception e) {
294                        LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
295                    }
296                }
297            }
298            processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
299        }
300    
301        private final void processNewTransports(boolean rebalance, String newTransports) {
302            if (newTransports != null) {
303                newTransports = newTransports.trim();
304                if (newTransports.length() > 0 && isUpdateURIsSupported()) {
305                    List<URI> list = new ArrayList<URI>();
306                    StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
307                    while (tokenizer.hasMoreTokens()) {
308                        String str = tokenizer.nextToken();
309                        try {
310                            URI uri = new URI(str);
311                            list.add(uri);
312                        } catch (Exception e) {
313                            LOG.error("Failed to parse broker address: " + str, e);
314                        }
315                    }
316                    if (list.isEmpty() == false) {
317                        try {
318                            updateURIs(rebalance, list.toArray(new URI[list.size()]));
319                        } catch (IOException e) {
320                            LOG.error("Failed to update transport URI's from: " + newTransports, e);
321                        }
322                    }
323                }
324            }
325        }
326    
327        public void start() throws Exception {
328            synchronized (reconnectMutex) {
329                if (LOG.isDebugEnabled()) {
330                    LOG.debug("Started " + this);
331                }
332                if (started) {
333                    return;
334                }
335                started = true;
336                stateTracker.setMaxCacheSize(getMaxCacheSize());
337                stateTracker.setTrackMessages(isTrackMessages());
338                stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
339                if (connectedTransport.get() != null) {
340                    stateTracker.restore(connectedTransport.get());
341                } else {
342                    reconnect(false);
343                }
344            }
345        }
346    
347        public void stop() throws Exception {
348            Transport transportToStop = null;
349            List<Transport> backupsToStop = new ArrayList<Transport>(backups.size());
350    
351            try {
352                synchronized (reconnectMutex) {
353                    if (LOG.isDebugEnabled()) {
354                        LOG.debug("Stopped " + this);
355                    }
356                    if (!started) {
357                        return;
358                    }
359                    started = false;
360                    disposed = true;
361                    connected = false;
362    
363                    if (connectedTransport.get() != null) {
364                        transportToStop = connectedTransport.getAndSet(null);
365                    }
366                    reconnectMutex.notifyAll();
367                }
368                synchronized (sleepMutex) {
369                    sleepMutex.notifyAll();
370                }
371            } finally {
372                reconnectTask.shutdown();
373                reconnectTaskFactory.shutdownNow();
374            }
375    
376            synchronized(backupMutex) {
377                for (BackupTransport backup : backups) {
378                    backup.setDisposed(true);
379                    Transport transport = backup.getTransport();
380                    if (transport != null) {
381                        transport.setTransportListener(disposedListener);
382                        backupsToStop.add(transport);
383                    }
384                }
385                backups.clear();
386            }
387            for (Transport transport : backupsToStop) {
388                try {
389                    if (LOG.isTraceEnabled()) {
390                        LOG.trace("Stopped backup: " + transport);
391                    }
392                    disposeTransport(transport);
393                } catch (Exception e) {
394                }
395            }
396            if (transportToStop != null) {
397                transportToStop.stop();
398            }
399        }
400    
401        public long getInitialReconnectDelay() {
402            return initialReconnectDelay;
403        }
404    
405        public void setInitialReconnectDelay(long initialReconnectDelay) {
406            this.initialReconnectDelay = initialReconnectDelay;
407        }
408    
409        public long getMaxReconnectDelay() {
410            return maxReconnectDelay;
411        }
412    
413        public void setMaxReconnectDelay(long maxReconnectDelay) {
414            this.maxReconnectDelay = maxReconnectDelay;
415        }
416    
417        public long getReconnectDelay() {
418            return reconnectDelay;
419        }
420    
421        public void setReconnectDelay(long reconnectDelay) {
422            this.reconnectDelay = reconnectDelay;
423        }
424    
425        public double getReconnectDelayExponent() {
426            return backOffMultiplier;
427        }
428    
429        public void setReconnectDelayExponent(double reconnectDelayExponent) {
430            this.backOffMultiplier = reconnectDelayExponent;
431        }
432    
433        public Transport getConnectedTransport() {
434            return connectedTransport.get();
435        }
436    
437        public URI getConnectedTransportURI() {
438            return connectedTransportURI;
439        }
440    
441        public int getMaxReconnectAttempts() {
442            return maxReconnectAttempts;
443        }
444    
445        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
446            this.maxReconnectAttempts = maxReconnectAttempts;
447        }
448    
449        public int getStartupMaxReconnectAttempts() {
450            return this.startupMaxReconnectAttempts;
451        }
452    
453        public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
454            this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
455        }
456    
457        public long getTimeout() {
458            return timeout;
459        }
460    
461        public void setTimeout(long timeout) {
462            this.timeout = timeout;
463        }
464    
465        /**
466         * @return Returns the randomize.
467         */
468        public boolean isRandomize() {
469            return randomize;
470        }
471    
472        /**
473         * @param randomize The randomize to set.
474         */
475        public void setRandomize(boolean randomize) {
476            this.randomize = randomize;
477        }
478    
479        public boolean isBackup() {
480            return backup;
481        }
482    
483        public void setBackup(boolean backup) {
484            this.backup = backup;
485        }
486    
487        public int getBackupPoolSize() {
488            return backupPoolSize;
489        }
490    
491        public void setBackupPoolSize(int backupPoolSize) {
492            this.backupPoolSize = backupPoolSize;
493        }
494    
495        public int getCurrentBackups() {
496            return this.backups.size();
497        }
498    
499        public boolean isTrackMessages() {
500            return trackMessages;
501        }
502    
503        public void setTrackMessages(boolean trackMessages) {
504            this.trackMessages = trackMessages;
505        }
506    
507        public boolean isTrackTransactionProducers() {
508            return this.trackTransactionProducers;
509        }
510    
511        public void setTrackTransactionProducers(boolean trackTransactionProducers) {
512            this.trackTransactionProducers = trackTransactionProducers;
513        }
514    
515        public int getMaxCacheSize() {
516            return maxCacheSize;
517        }
518    
519        public void setMaxCacheSize(int maxCacheSize) {
520            this.maxCacheSize = maxCacheSize;
521        }
522    
523        public boolean isPriorityBackup() {
524            return priorityBackup;
525        }
526    
527        public void setPriorityBackup(boolean priorityBackup) {
528            this.priorityBackup = priorityBackup;
529        }
530    
531        public void setPriorityURIs(String priorityURIs) {
532            StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
533            while (tokenizer.hasMoreTokens()) {
534                String str = tokenizer.nextToken();
535                try {
536                    URI uri = new URI(str);
537                    priorityList.add(uri);
538                } catch (Exception e) {
539                    LOG.error("Failed to parse broker address: " + str, e);
540                }
541            }
542        }
543    
544        public void oneway(Object o) throws IOException {
545    
546            Command command = (Command) o;
547            Exception error = null;
548            try {
549    
550                synchronized (reconnectMutex) {
551    
552                    if (command != null && connectedTransport.get() == null) {
553                        if (command.isShutdownInfo()) {
554                            // Skipping send of ShutdownInfo command when not connected.
555                            return;
556                        } else if (command instanceof RemoveInfo || command.isMessageAck()) {
557                            // Simulate response to RemoveInfo command or MessageAck (as it will be stale)
558                            stateTracker.track(command);
559                            if (command.isResponseRequired()) {
560                                Response response = new Response();
561                                response.setCorrelationId(command.getCommandId());
562                                myTransportListener.onCommand(response);
563                            }
564                            return;
565                        } else if (command instanceof MessagePull) {
566                            // Simulate response to MessagePull if timed as we can't honor that now.
567                            MessagePull pullRequest = (MessagePull) command;
568                            if (pullRequest.getTimeout() != 0) {
569                                MessageDispatch dispatch = new MessageDispatch();
570                                dispatch.setConsumerId(pullRequest.getConsumerId());
571                                dispatch.setDestination(pullRequest.getDestination());
572                                myTransportListener.onCommand(dispatch);
573                            }
574                            return;
575                        }
576                    }
577    
578                    // Keep trying until the message is sent.
579                    for (int i = 0; !disposed; i++) {
580                        try {
581    
582                            // Wait for transport to be connected.
583                            Transport transport = connectedTransport.get();
584                            long start = System.currentTimeMillis();
585                            boolean timedout = false;
586                            while (transport == null && !disposed && connectionFailure == null
587                                    && !Thread.currentThread().isInterrupted()) {
588                                if (LOG.isTraceEnabled()) {
589                                    LOG.trace("Waiting for transport to reconnect..: " + command);
590                                }
591                                long end = System.currentTimeMillis();
592                                if (timeout > 0 && (end - start > timeout)) {
593                                    timedout = true;
594                                    if (LOG.isInfoEnabled()) {
595                                        LOG.info("Failover timed out after " + (end - start) + "ms");
596                                    }
597                                    break;
598                                }
599                                try {
600                                    reconnectMutex.wait(100);
601                                } catch (InterruptedException e) {
602                                    Thread.currentThread().interrupt();
603                                    if (LOG.isDebugEnabled()) {
604                                        LOG.debug("Interupted: " + e, e);
605                                    }
606                                }
607                                transport = connectedTransport.get();
608                            }
609    
610                            if (transport == null) {
611                                // Previous loop may have exited due to use being
612                                // disposed.
613                                if (disposed) {
614                                    error = new IOException("Transport disposed.");
615                                } else if (connectionFailure != null) {
616                                    error = connectionFailure;
617                                } else if (timedout == true) {
618                                    error = new IOException("Failover timeout of " + timeout + " ms reached.");
619                                } else {
620                                    error = new IOException("Unexpected failure.");
621                                }
622                                break;
623                            }
624    
625                            // If it was a request and it was not being tracked by
626                            // the state tracker,
627                            // then hold it in the requestMap so that we can replay
628                            // it later.
629                            Tracked tracked = stateTracker.track(command);
630                            synchronized (requestMap) {
631                                if (tracked != null && tracked.isWaitingForResponse()) {
632                                    requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
633                                } else if (tracked == null && command.isResponseRequired()) {
634                                    requestMap.put(Integer.valueOf(command.getCommandId()), command);
635                                }
636                            }
637    
638                            // Send the message.
639                            try {
640                                transport.oneway(command);
641                                stateTracker.trackBack(command);
642                            } catch (IOException e) {
643    
644                                // If the command was not tracked.. we will retry in
645                                // this method
646                                if (tracked == null) {
647    
648                                    // since we will retry in this method.. take it
649                                    // out of the request
650                                    // map so that it is not sent 2 times on
651                                    // recovery
652                                    if (command.isResponseRequired()) {
653                                        requestMap.remove(Integer.valueOf(command.getCommandId()));
654                                    }
655    
656                                    // Rethrow the exception so it will handled by
657                                    // the outer catch
658                                    throw e;
659                                } else {
660                                    // Handle the error but allow the method to return since the
661                                    // tracked commands are replayed on reconnect.
662                                    if (LOG.isDebugEnabled()) {
663                                        LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
664                                    }
665                                    handleTransportFailure(e);
666                                }
667                            }
668    
669                            return;
670    
671                        } catch (IOException e) {
672                            if (LOG.isDebugEnabled()) {
673                                LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
674                            }
675                            handleTransportFailure(e);
676                        }
677                    }
678                }
679            } catch (InterruptedException e) {
680                // Some one may be trying to stop our thread.
681                Thread.currentThread().interrupt();
682                throw new InterruptedIOException();
683            }
684    
685            if (!disposed) {
686                if (error != null) {
687                    if (error instanceof IOException) {
688                        throw (IOException) error;
689                    }
690                    throw IOExceptionSupport.create(error);
691                }
692            }
693        }
694    
695        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
696            throw new AssertionError("Unsupported Method");
697        }
698    
699        public Object request(Object command) throws IOException {
700            throw new AssertionError("Unsupported Method");
701        }
702    
703        public Object request(Object command, int timeout) throws IOException {
704            throw new AssertionError("Unsupported Method");
705        }
706    
707        public void add(boolean rebalance, URI u[]) {
708            boolean newURI = false;
709            for (URI uri : u) {
710                if (!contains(uri)) {
711                    uris.add(uri);
712                    newURI = true;
713                }
714            }
715            if (newURI) {
716                reconnect(rebalance);
717            }
718        }
719    
720        public void remove(boolean rebalance, URI u[]) {
721            for (URI uri : u) {
722                uris.remove(uri);
723            }
724            // rebalance is automatic if any connected to removed/stopped broker
725        }
726    
727        public void add(boolean rebalance, String u) {
728            try {
729                URI newURI = new URI(u);
730                if (contains(newURI) == false) {
731                    uris.add(newURI);
732                    reconnect(rebalance);
733                }
734    
735            } catch (Exception e) {
736                LOG.error("Failed to parse URI: " + u);
737            }
738        }
739    
740        public void reconnect(boolean rebalance) {
741            synchronized (reconnectMutex) {
742                if (started) {
743                    if (rebalance) {
744                        doRebalance = true;
745                    }
746                    LOG.debug("Waking up reconnect task");
747                    try {
748                        reconnectTask.wakeup();
749                    } catch (InterruptedException e) {
750                        Thread.currentThread().interrupt();
751                    }
752                } else {
753                    LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
754                }
755            }
756        }
757    
758        private List<URI> getConnectList() {
759            if (!updated.isEmpty()) {
760                if (failedConnectTransportURI != null) {
761                    boolean removed = updated.remove(failedConnectTransportURI);
762                    if (removed) {
763                        updated.add(failedConnectTransportURI);
764                    }
765                }
766                return updated;
767            }
768            ArrayList<URI> l = new ArrayList<URI>(uris);
769            boolean removed = false;
770            if (failedConnectTransportURI != null) {
771                removed = l.remove(failedConnectTransportURI);
772            }
773            if (randomize) {
774                // Randomly, reorder the list by random swapping
775                for (int i = 0; i < l.size(); i++) {
776                    int p = (int) (Math.random() * 100 % l.size());
777                    URI t = l.get(p);
778                    l.set(p, l.get(i));
779                    l.set(i, t);
780                }
781            }
782            if (removed) {
783                l.add(failedConnectTransportURI);
784            }
785            if (LOG.isDebugEnabled()) {
786                LOG.debug("urlList connectionList:" + l + ", from: " + uris);
787            }
788            return l;
789        }
790    
791        public TransportListener getTransportListener() {
792            return transportListener;
793        }
794    
795        public void setTransportListener(TransportListener commandListener) {
796            synchronized (listenerMutex) {
797                this.transportListener = commandListener;
798                listenerMutex.notifyAll();
799            }
800        }
801    
802        public <T> T narrow(Class<T> target) {
803    
804            if (target.isAssignableFrom(getClass())) {
805                return target.cast(this);
806            }
807            Transport transport = connectedTransport.get();
808            if (transport != null) {
809                return transport.narrow(target);
810            }
811            return null;
812    
813        }
814    
815        protected void restoreTransport(Transport t) throws Exception, IOException {
816            t.start();
817            // send information to the broker - informing it we are an ft client
818            ConnectionControl cc = new ConnectionControl();
819            cc.setFaultTolerant(true);
820            t.oneway(cc);
821            stateTracker.restore(t);
822            Map<Integer, Command> tmpMap = null;
823            synchronized (requestMap) {
824                tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
825            }
826            for (Command command : tmpMap.values()) {
827                if (LOG.isTraceEnabled()) {
828                    LOG.trace("restore requestMap, replay: " + command);
829                }
830                t.oneway(command);
831            }
832        }
833    
834        public boolean isUseExponentialBackOff() {
835            return useExponentialBackOff;
836        }
837    
838        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
839            this.useExponentialBackOff = useExponentialBackOff;
840        }
841    
842        @Override
843        public String toString() {
844            return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
845        }
846    
847        public String getRemoteAddress() {
848            Transport transport = connectedTransport.get();
849            if (transport != null) {
850                return transport.getRemoteAddress();
851            }
852            return null;
853        }
854    
855        public boolean isFaultTolerant() {
856            return true;
857        }
858    
859        private void doUpdateURIsFromDisk() {
860            // If updateURIsURL is specified, read the file and add any new
861            // transport URI's to this FailOverTransport.
862            // Note: Could track file timestamp to avoid unnecessary reading.
863            String fileURL = getUpdateURIsURL();
864            if (fileURL != null) {
865                BufferedReader in = null;
866                String newUris = null;
867                StringBuffer buffer = new StringBuffer();
868    
869                try {
870                    in = new BufferedReader(getURLStream(fileURL));
871                    while (true) {
872                        String line = in.readLine();
873                        if (line == null) {
874                            break;
875                        }
876                        buffer.append(line);
877                    }
878                    newUris = buffer.toString();
879                } catch (IOException ioe) {
880                    LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
881                } finally {
882                    if (in != null) {
883                        try {
884                            in.close();
885                        } catch (IOException ioe) {
886                            // ignore
887                        }
888                    }
889                }
890    
891                processNewTransports(isRebalanceUpdateURIs(), newUris);
892            }
893        }
894    
895        final boolean doReconnect() {
896            Exception failure = null;
897            synchronized (reconnectMutex) {
898    
899                // First ensure we are up to date.
900                doUpdateURIsFromDisk();
901    
902                if (disposed || connectionFailure != null) {
903                    reconnectMutex.notifyAll();
904                }
905                if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
906                    return false;
907                } else {
908                    List<URI> connectList = getConnectList();
909                    if (connectList.isEmpty()) {
910                        failure = new IOException("No uris available to connect to.");
911                    } else {
912                        if (doRebalance) {
913                            if (connectList.get(0).equals(connectedTransportURI)) {
914                                // already connected to first in the list, no need to rebalance
915                                doRebalance = false;
916                                return false;
917                            } else {
918                                if (LOG.isDebugEnabled()) {
919                                    LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
920                                }
921                                try {
922                                    Transport transport = this.connectedTransport.getAndSet(null);
923                                    if (transport != null) {
924                                        disposeTransport(transport);
925                                    }
926                                } catch (Exception e) {
927                                    if (LOG.isDebugEnabled()) {
928                                        LOG.debug("Caught an exception stopping existing transport for rebalance", e);
929                                    }
930                                }
931                            }
932                            doRebalance = false;
933                        }
934    
935                        resetReconnectDelay();
936    
937                        Transport transport = null;
938                        URI uri = null;
939    
940                        // If we have a backup already waiting lets try it.
941                        synchronized (backupMutex) {
942                            if ((priorityBackup || backup) && !backups.isEmpty()) {
943                                ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups);
944                                if (randomize) {
945                                    Collections.shuffle(l);
946                                }
947                                BackupTransport bt = l.remove(0);
948                                backups.remove(bt);
949                                transport = bt.getTransport();
950                                uri = bt.getUri();
951                                if (priorityBackup && priorityBackupAvailable) {
952                                    Transport old = this.connectedTransport.getAndSet(null);
953                                    if (transport != null) {
954                                        disposeTransport(old);
955                                    }
956                                    priorityBackupAvailable = false;
957                                }
958                            }
959                        }
960    
961                        // Sleep for the reconnectDelay if there's no backup and we aren't trying
962                        // for the first time, or we were disposed for some reason.
963                        if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) {
964                            synchronized (sleepMutex) {
965                                if (LOG.isDebugEnabled()) {
966                                    LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
967                                }
968                                try {
969                                    sleepMutex.wait(reconnectDelay);
970                                } catch (InterruptedException e) {
971                                    Thread.currentThread().interrupt();
972                                }
973                            }
974                        }
975    
976                        Iterator<URI> iter = connectList.iterator();
977                        while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
978    
979                            try {
980                                SslContext.setCurrentSslContext(brokerSslContext);
981    
982                                // We could be starting with a backup and if so we wait to grab a
983                                // URI from the pool until next time around.
984                                if (transport == null) {
985                                    uri = iter.next();
986                                    transport = TransportFactory.compositeConnect(uri);
987                                }
988    
989                                if (LOG.isDebugEnabled()) {
990                                    LOG.debug("Attempting  " + connectFailures + "th  connect to: " + uri);
991                                }
992                                transport.setTransportListener(myTransportListener);
993                                transport.start();
994    
995                                if (started &&  !firstConnection) {
996                                    restoreTransport(transport);
997                                }
998    
999                                 if (LOG.isDebugEnabled()) {
1000                                    LOG.debug("Connection established");
1001                                 }
1002                                reconnectDelay = initialReconnectDelay;
1003                                connectedTransportURI = uri;
1004                                connectedTransport.set(transport);
1005                                reconnectMutex.notifyAll();
1006                                connectFailures = 0;
1007    
1008                                // Make sure on initial startup, that the transportListener
1009                                // has been initialized for this instance.
1010                                synchronized (listenerMutex) {
1011                                    if (transportListener == null) {
1012                                        try {
1013                                            // if it isn't set after 2secs - it probably never will be
1014                                            listenerMutex.wait(2000);
1015                                        } catch (InterruptedException ex) {
1016                                        }
1017                                    }
1018                                }
1019    
1020                                if (transportListener != null) {
1021                                    transportListener.transportResumed();
1022                                } else {
1023                                    if (LOG.isDebugEnabled()) {
1024                                        LOG.debug("transport resumed by transport listener not set");
1025                                    }
1026                                }
1027    
1028                                if (firstConnection) {
1029                                    firstConnection = false;
1030                                    LOG.info("Successfully connected to " + uri);
1031                                } else {
1032                                    LOG.info("Successfully reconnected to " + uri);
1033                                }
1034    
1035                                connected = true;
1036                                return false;
1037                            } catch (Exception e) {
1038                                failure = e;
1039                                if (LOG.isDebugEnabled()) {
1040                                    LOG.debug("Connect fail to: " + uri + ", reason: " + e);
1041                                }
1042                                if (transport != null) {
1043                                    try {
1044                                        transport.stop();
1045                                        transport = null;
1046                                    } catch (Exception ee) {
1047                                        if (LOG.isDebugEnabled()) {
1048                                            LOG.debug("Stop of failed transport: " + transport +
1049                                                      " failed with reason: " + ee);
1050                                        }
1051                                    }
1052                                }
1053                            } finally {
1054                                SslContext.setCurrentSslContext(null);
1055                            }
1056                        }
1057                    }
1058                }
1059    
1060                int reconnectLimit = calculateReconnectAttemptLimit();
1061    
1062                connectFailures++;
1063                if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
1064                    LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
1065                    connectionFailure = failure;
1066    
1067                    // Make sure on initial startup, that the transportListener has been
1068                    // initialized for this instance.
1069                    synchronized (listenerMutex) {
1070                        if (transportListener == null) {
1071                            try {
1072                                listenerMutex.wait(2000);
1073                            } catch (InterruptedException ex) {
1074                            }
1075                        }
1076                    }
1077    
1078                    propagateFailureToExceptionListener(connectionFailure);
1079                    return false;
1080                }
1081            }
1082    
1083            if (!disposed) {
1084                doDelay();
1085            }
1086    
1087            return !disposed;
1088        }
1089    
1090        private void doDelay() {
1091            if (reconnectDelay > 0) {
1092                synchronized (sleepMutex) {
1093                    if (LOG.isDebugEnabled()) {
1094                        LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
1095                    }
1096                    try {
1097                        sleepMutex.wait(reconnectDelay);
1098                    } catch (InterruptedException e) {
1099                        Thread.currentThread().interrupt();
1100                    }
1101                }
1102            }
1103    
1104            if (useExponentialBackOff) {
1105                // Exponential increment of reconnect delay.
1106                reconnectDelay *= backOffMultiplier;
1107                if (reconnectDelay > maxReconnectDelay) {
1108                    reconnectDelay = maxReconnectDelay;
1109                }
1110            }
1111        }
1112    
1113        private void resetReconnectDelay() {
1114            if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
1115                reconnectDelay = initialReconnectDelay;
1116            }
1117        }
1118    
1119        /*
1120          * called with reconnectMutex held
1121         */
1122        private void propagateFailureToExceptionListener(Exception exception) {
1123            if (transportListener != null) {
1124                if (exception instanceof IOException) {
1125                    transportListener.onException((IOException)exception);
1126                } else {
1127                    transportListener.onException(IOExceptionSupport.create(exception));
1128                }
1129            }
1130            reconnectMutex.notifyAll();
1131        }
1132    
1133        private int calculateReconnectAttemptLimit() {
1134            int maxReconnectValue = this.maxReconnectAttempts;
1135            if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {
1136                maxReconnectValue = this.startupMaxReconnectAttempts;
1137            }
1138            return maxReconnectValue;
1139        }
1140    
1141        final boolean buildBackups() {
1142            synchronized (backupMutex) {
1143                if (!disposed && (backup || priorityBackup) && backups.size() < backupPoolSize) {
1144                    ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
1145                    List<URI> connectList = getConnectList();
1146                    for (URI uri: connectList) {
1147                        if (!backupList.contains(uri)) {
1148                            backupList.add(uri);
1149                        }
1150                    }
1151                    // removed disposed backups
1152                    List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
1153                    for (BackupTransport bt : backups) {
1154                        if (bt.isDisposed()) {
1155                            disposedList.add(bt);
1156                        }
1157                    }
1158                    backups.removeAll(disposedList);
1159                    disposedList.clear();
1160                    for (Iterator<URI> iter = backupList.iterator(); !disposed && iter.hasNext() && backups.size() < backupPoolSize; ) {
1161                        URI uri = iter.next();
1162                        if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
1163                            try {
1164                                SslContext.setCurrentSslContext(brokerSslContext);
1165                                BackupTransport bt = new BackupTransport(this);
1166                                bt.setUri(uri);
1167                                if (!backups.contains(bt)) {
1168                                    Transport t = TransportFactory.compositeConnect(uri);
1169                                    t.setTransportListener(bt);
1170                                    t.start();
1171                                    bt.setTransport(t);
1172                                    backups.add(bt);
1173                                    if (priorityBackup && isPriority(uri)) {
1174                                       priorityBackupAvailable = true;
1175                                    }
1176                                }
1177                            } catch (Exception e) {
1178                                LOG.debug("Failed to build backup ", e);
1179                            } finally {
1180                                SslContext.setCurrentSslContext(null);
1181                            }
1182                        }
1183                    }
1184                }
1185            }
1186            return false;
1187        }
1188    
1189        protected boolean isPriority(URI uri) {
1190            if (!priorityList.isEmpty()) {
1191                return priorityList.contains(uri);
1192            }
1193            return uris.indexOf(uri) == 0;
1194        }
1195    
1196        public boolean isDisposed() {
1197            return disposed;
1198        }
1199    
1200        public boolean isConnected() {
1201            return connected;
1202        }
1203    
1204        public void reconnect(URI uri) throws IOException {
1205            add(true, new URI[]{uri});
1206        }
1207    
1208        public boolean isReconnectSupported() {
1209            return this.reconnectSupported;
1210        }
1211    
1212        public void setReconnectSupported(boolean value) {
1213            this.reconnectSupported = value;
1214        }
1215    
1216        public boolean isUpdateURIsSupported() {
1217            return this.updateURIsSupported;
1218        }
1219    
1220        public void setUpdateURIsSupported(boolean value) {
1221            this.updateURIsSupported = value;
1222        }
1223    
1224        public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
1225            if (isUpdateURIsSupported()) {
1226                HashSet<URI> copy = new HashSet<URI>(this.updated);
1227                updated.clear();
1228                if (updatedURIs != null && updatedURIs.length > 0) {
1229                    for (URI uri : updatedURIs) {
1230                        if (uri != null && !updated.contains(uri)) {
1231                            updated.add(uri);
1232                        }
1233                    }
1234                    if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) {
1235                        buildBackups();
1236                        synchronized (reconnectMutex) {
1237                            reconnect(rebalance);
1238                        }
1239                    }
1240                }
1241            }
1242        }
1243    
1244        /**
1245         * @return the updateURIsURL
1246         */
1247        public String getUpdateURIsURL() {
1248            return this.updateURIsURL;
1249        }
1250    
1251        /**
1252         * @param updateURIsURL the updateURIsURL to set
1253         */
1254        public void setUpdateURIsURL(String updateURIsURL) {
1255            this.updateURIsURL = updateURIsURL;
1256        }
1257    
1258        /**
1259         * @return the rebalanceUpdateURIs
1260         */
1261        public boolean isRebalanceUpdateURIs() {
1262            return this.rebalanceUpdateURIs;
1263        }
1264    
1265        /**
1266         * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
1267         */
1268        public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
1269            this.rebalanceUpdateURIs = rebalanceUpdateURIs;
1270        }
1271    
1272        public int getReceiveCounter() {
1273            Transport transport = connectedTransport.get();
1274            if (transport == null) {
1275                return 0;
1276            }
1277            return transport.getReceiveCounter();
1278        }
1279    
1280        public int getConnectFailures() {
1281            return connectFailures;
1282        }
1283    
1284        public void connectionInterruptProcessingComplete(ConnectionId connectionId) {
1285            synchronized (reconnectMutex) {
1286                stateTracker.connectionInterruptProcessingComplete(this, connectionId);
1287            }
1288        }
1289    
1290        public ConnectionStateTracker getStateTracker() {
1291            return stateTracker;
1292        }
1293    
1294        private boolean contains(URI newURI) {
1295            boolean result = false;
1296            for (URI uri : uris) {
1297                if (newURI.getPort() == uri.getPort()) {
1298                    InetAddress newAddr = null;
1299                    InetAddress addr = null;
1300                    try {
1301                        newAddr = InetAddress.getByName(newURI.getHost());
1302                        addr = InetAddress.getByName(uri.getHost());
1303                    } catch(IOException e) {
1304    
1305                        if (newAddr == null) {
1306                            LOG.error("Failed to Lookup INetAddress for URI[ " + newURI + " ] : " + e);
1307                        } else {
1308                            LOG.error("Failed to Lookup INetAddress for URI[ " + uri + " ] : " + e);
1309                        }
1310    
1311                        if (newURI.getHost().equalsIgnoreCase(uri.getHost())) {
1312                            result = true;
1313                            break;
1314                        } else {
1315                            continue;
1316                        }
1317                    }
1318    
1319                    if (addr.equals(newAddr)) {
1320                        result = true;
1321                        break;
1322                    }
1323                }
1324            }
1325    
1326            return result;
1327        }
1328    
1329        private InputStreamReader getURLStream(String path) throws IOException {
1330            InputStreamReader result = null;
1331            URL url = null;
1332            try {
1333                url = new URL(path);
1334                result = new InputStreamReader(url.openStream());
1335            } catch (MalformedURLException e) {
1336                // ignore - it could be a path to a a local file
1337            }
1338            if (result == null) {
1339                result = new FileReader(path);
1340            }
1341            return result;
1342        }
1343    }