001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.failover;
018    
019    import java.io.BufferedReader;
020    import java.io.FileReader;
021    import java.io.IOException;
022    import java.io.InputStreamReader;
023    import java.io.InterruptedIOException;
024    import java.net.*;
025    import java.util.*;
026    import java.util.concurrent.CopyOnWriteArrayList;
027    import java.util.concurrent.atomic.AtomicReference;
028    
029    import org.apache.activemq.broker.SslContext;
030    import org.apache.activemq.command.Command;
031    import org.apache.activemq.command.ConnectionControl;
032    import org.apache.activemq.command.ConnectionId;
033    import org.apache.activemq.command.MessageDispatch;
034    import org.apache.activemq.command.MessagePull;
035    import org.apache.activemq.command.RemoveInfo;
036    import org.apache.activemq.command.Response;
037    import org.apache.activemq.state.ConnectionStateTracker;
038    import org.apache.activemq.state.Tracked;
039    import org.apache.activemq.thread.Task;
040    import org.apache.activemq.thread.TaskRunner;
041    import org.apache.activemq.thread.TaskRunnerFactory;
042    import org.apache.activemq.transport.CompositeTransport;
043    import org.apache.activemq.transport.DefaultTransportListener;
044    import org.apache.activemq.transport.FutureResponse;
045    import org.apache.activemq.transport.ResponseCallback;
046    import org.apache.activemq.transport.Transport;
047    import org.apache.activemq.transport.TransportFactory;
048    import org.apache.activemq.transport.TransportListener;
049    import org.apache.activemq.util.IOExceptionSupport;
050    import org.apache.activemq.util.ServiceSupport;
051    import org.apache.activemq.util.URISupport;
052    import org.slf4j.Logger;
053    import org.slf4j.LoggerFactory;
054    
055    /**
056     * A Transport that is made reliable by being able to fail over to another
057     * transport when a transport failure is detected.
058     */
059    public class FailoverTransport implements CompositeTransport {
060    
061        private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
062        private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
063        private static final int INFINITE = -1;
064        private TransportListener transportListener;
065        private boolean disposed;
066        private boolean connected;
067        private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
068        private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
069    
070        private final Object reconnectMutex = new Object();
071        private final Object backupMutex = new Object();
072        private final Object sleepMutex = new Object();
073        private final Object listenerMutex = new Object();
074        private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
075        private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
076    
077        private URI connectedTransportURI;
078        private URI failedConnectTransportURI;
079        private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
080        private final TaskRunnerFactory reconnectTaskFactory;
081        private final TaskRunner reconnectTask;
082        private boolean started;
083        private boolean initialized;
084        private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
085        private long maxReconnectDelay = 1000 * 30;
086        private double backOffMultiplier = 2d;
087        private long timeout = INFINITE;
088        private boolean useExponentialBackOff = true;
089        private boolean randomize = true;
090        private int maxReconnectAttempts = INFINITE;
091        private int startupMaxReconnectAttempts = INFINITE;
092        private int connectFailures;
093        private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
094        private Exception connectionFailure;
095        private boolean firstConnection = true;
096        // optionally always have a backup created
097        private boolean backup = false;
098        private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
099        private int backupPoolSize = 1;
100        private boolean trackMessages = false;
101        private boolean trackTransactionProducers = true;
102        private int maxCacheSize = 128 * 1024;
103        private final TransportListener disposedListener = new DefaultTransportListener() {
104        };
105        private final TransportListener myTransportListener = createTransportListener();
106        private boolean updateURIsSupported = true;
107        private boolean reconnectSupported = true;
108        // remember for reconnect thread
109        private SslContext brokerSslContext;
110        private String updateURIsURL = null;
111        private boolean rebalanceUpdateURIs = true;
112        private boolean doRebalance = false;
113        private boolean connectedToPriority = false;
114    
115        private boolean priorityBackup = false;
116        private final ArrayList<URI> priorityList = new ArrayList<URI>();
117        private boolean priorityBackupAvailable = false;
118        private String nestedExtraQueryOptions;
119    
120        public FailoverTransport() throws InterruptedIOException {
121            brokerSslContext = SslContext.getCurrentSslContext();
122            stateTracker.setTrackTransactions(true);
123            // Setup a task that is used to reconnect the a connection async.
124            reconnectTaskFactory = new TaskRunnerFactory();
125            reconnectTaskFactory.init();
126            reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
127                @Override
128                public boolean iterate() {
129                    boolean result = false;
130                    if (!started) {
131                        return result;
132                    }
133                    boolean buildBackup = true;
134                    synchronized (backupMutex) {
135                        if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
136                            result = doReconnect();
137                            buildBackup = false;
138                        }
139                    }
140                    if (buildBackup) {
141                        buildBackups();
142                        if (priorityBackup && !connectedToPriority) {
143                            try {
144                                doDelay();
145                                if (reconnectTask == null) {
146                                    return true;
147                                }
148                                reconnectTask.wakeup();
149                            } catch (InterruptedException e) {
150                                LOG.debug("Reconnect task has been interrupted.", e);
151                            }
152                        }
153                    } else {
154                        // build backups on the next iteration
155                        buildBackup = true;
156                        try {
157                            if (reconnectTask == null) {
158                                return true;
159                            }
160                            reconnectTask.wakeup();
161                        } catch (InterruptedException e) {
162                            LOG.debug("Reconnect task has been interrupted.", e);
163                        }
164                    }
165                    return result;
166                }
167    
168            }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
169        }
170    
171        TransportListener createTransportListener() {
172            return new TransportListener() {
173                @Override
174                public void onCommand(Object o) {
175                    Command command = (Command) o;
176                    if (command == null) {
177                        return;
178                    }
179                    if (command.isResponse()) {
180                        Object object = null;
181                        synchronized (requestMap) {
182                            object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
183                        }
184                        if (object != null && object.getClass() == Tracked.class) {
185                            ((Tracked) object).onResponses(command);
186                        }
187                    }
188                    if (!initialized) {
189                        initialized = true;
190                    }
191    
192                    if (command.isConnectionControl()) {
193                        handleConnectionControl((ConnectionControl) command);
194                    }
195                    if (transportListener != null) {
196                        transportListener.onCommand(command);
197                    }
198                }
199    
200                @Override
201                public void onException(IOException error) {
202                    try {
203                        handleTransportFailure(error);
204                    } catch (InterruptedException e) {
205                        Thread.currentThread().interrupt();
206                        transportListener.onException(new InterruptedIOException());
207                    }
208                }
209    
210                @Override
211                public void transportInterupted() {
212                    if (transportListener != null) {
213                        transportListener.transportInterupted();
214                    }
215                }
216    
217                @Override
218                public void transportResumed() {
219                    if (transportListener != null) {
220                        transportListener.transportResumed();
221                    }
222                }
223            };
224        }
225    
226        public final void disposeTransport(Transport transport) {
227            transport.setTransportListener(disposedListener);
228            ServiceSupport.dispose(transport);
229        }
230    
231        public final void handleTransportFailure(IOException e) throws InterruptedException {
232            if (LOG.isTraceEnabled()) {
233                LOG.trace(this + " handleTransportFailure: " + e);
234            }
235            Transport transport = connectedTransport.getAndSet(null);
236            if (transport == null) {
237                // sync with possible in progress reconnect
238                synchronized (reconnectMutex) {
239                    transport = connectedTransport.getAndSet(null);
240                }
241            }
242            if (transport != null) {
243    
244                disposeTransport(transport);
245    
246                boolean reconnectOk = false;
247                synchronized (reconnectMutex) {
248                    if (canReconnect()) {
249                        reconnectOk = true;
250                    }
251                    LOG.warn("Transport (" + transport.getRemoteAddress() + ") failed, reason:  " + e
252                            + (reconnectOk ? "," : ", not")  +" attempting to automatically reconnect");
253    
254                    initialized = false;
255                    failedConnectTransportURI = connectedTransportURI;
256                    connectedTransportURI = null;
257                    connected = false;
258                    connectedToPriority = false;
259    
260                    if (reconnectOk) {
261                        // notify before any reconnect attempt so ack state can be whacked
262                        if (transportListener != null) {
263                            transportListener.transportInterupted();
264                        }
265    
266                        updated.remove(failedConnectTransportURI);
267                        reconnectTask.wakeup();
268                    } else if (!isDisposed()) {
269                        propagateFailureToExceptionListener(e);
270                    }
271                }
272            }
273        }
274    
275        private boolean canReconnect() {
276            return started && 0 != calculateReconnectAttemptLimit();
277        }
278    
279        public final void handleConnectionControl(ConnectionControl control) {
280            String reconnectStr = control.getReconnectTo();
281            if (LOG.isTraceEnabled()) {
282                LOG.trace("Received ConnectionControl: {}", control);
283            }
284    
285            if (reconnectStr != null) {
286                reconnectStr = reconnectStr.trim();
287                if (reconnectStr.length() > 0) {
288                    try {
289                        URI uri = new URI(reconnectStr);
290                        if (isReconnectSupported()) {
291                            reconnect(uri);
292                            LOG.info("Reconnected to: " + uri);
293                        }
294                    } catch (Exception e) {
295                        LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
296                    }
297                }
298            }
299            processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
300        }
301    
302        private final void processNewTransports(boolean rebalance, String newTransports) {
303            if (newTransports != null) {
304                newTransports = newTransports.trim();
305                if (newTransports.length() > 0 && isUpdateURIsSupported()) {
306                    List<URI> list = new ArrayList<URI>();
307                    StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
308                    while (tokenizer.hasMoreTokens()) {
309                        String str = tokenizer.nextToken();
310                        try {
311                            URI uri = new URI(str);
312                            list.add(uri);
313                        } catch (Exception e) {
314                            LOG.error("Failed to parse broker address: " + str, e);
315                        }
316                    }
317                    if (list.isEmpty() == false) {
318                        try {
319                            updateURIs(rebalance, list.toArray(new URI[list.size()]));
320                        } catch (IOException e) {
321                            LOG.error("Failed to update transport URI's from: " + newTransports, e);
322                        }
323                    }
324                }
325            }
326        }
327    
328        @Override
329        public void start() throws Exception {
330            synchronized (reconnectMutex) {
331                if (LOG.isDebugEnabled()) {
332                    LOG.debug("Started " + this);
333                }
334                if (started) {
335                    return;
336                }
337                started = true;
338                stateTracker.setMaxCacheSize(getMaxCacheSize());
339                stateTracker.setTrackMessages(isTrackMessages());
340                stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
341                if (connectedTransport.get() != null) {
342                    stateTracker.restore(connectedTransport.get());
343                } else {
344                    reconnect(false);
345                }
346            }
347        }
348    
349        @Override
350        public void stop() throws Exception {
351            Transport transportToStop = null;
352            List<Transport> backupsToStop = new ArrayList<Transport>(backups.size());
353    
354            try {
355                synchronized (reconnectMutex) {
356                    if (LOG.isDebugEnabled()) {
357                        LOG.debug("Stopped " + this);
358                    }
359                    if (!started) {
360                        return;
361                    }
362                    started = false;
363                    disposed = true;
364                    connected = false;
365    
366                    if (connectedTransport.get() != null) {
367                        transportToStop = connectedTransport.getAndSet(null);
368                    }
369                    reconnectMutex.notifyAll();
370                }
371                synchronized (sleepMutex) {
372                    sleepMutex.notifyAll();
373                }
374            } finally {
375                reconnectTask.shutdown();
376                reconnectTaskFactory.shutdownNow();
377            }
378    
379            synchronized(backupMutex) {
380                for (BackupTransport backup : backups) {
381                    backup.setDisposed(true);
382                    Transport transport = backup.getTransport();
383                    if (transport != null) {
384                        transport.setTransportListener(disposedListener);
385                        backupsToStop.add(transport);
386                    }
387                }
388                backups.clear();
389            }
390            for (Transport transport : backupsToStop) {
391                try {
392                    if (LOG.isTraceEnabled()) {
393                        LOG.trace("Stopped backup: " + transport);
394                    }
395                    disposeTransport(transport);
396                } catch (Exception e) {
397                }
398            }
399            if (transportToStop != null) {
400                transportToStop.stop();
401            }
402        }
403    
404        public long getInitialReconnectDelay() {
405            return initialReconnectDelay;
406        }
407    
408        public void setInitialReconnectDelay(long initialReconnectDelay) {
409            this.initialReconnectDelay = initialReconnectDelay;
410        }
411    
412        public long getMaxReconnectDelay() {
413            return maxReconnectDelay;
414        }
415    
416        public void setMaxReconnectDelay(long maxReconnectDelay) {
417            this.maxReconnectDelay = maxReconnectDelay;
418        }
419    
420        public long getReconnectDelay() {
421            return reconnectDelay;
422        }
423    
424        public void setReconnectDelay(long reconnectDelay) {
425            this.reconnectDelay = reconnectDelay;
426        }
427    
428        public double getReconnectDelayExponent() {
429            return backOffMultiplier;
430        }
431    
432        public void setReconnectDelayExponent(double reconnectDelayExponent) {
433            this.backOffMultiplier = reconnectDelayExponent;
434        }
435    
436        public Transport getConnectedTransport() {
437            return connectedTransport.get();
438        }
439    
440        public URI getConnectedTransportURI() {
441            return connectedTransportURI;
442        }
443    
444        public int getMaxReconnectAttempts() {
445            return maxReconnectAttempts;
446        }
447    
448        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
449            this.maxReconnectAttempts = maxReconnectAttempts;
450        }
451    
452        public int getStartupMaxReconnectAttempts() {
453            return this.startupMaxReconnectAttempts;
454        }
455    
456        public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
457            this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
458        }
459    
460        public long getTimeout() {
461            return timeout;
462        }
463    
464        public void setTimeout(long timeout) {
465            this.timeout = timeout;
466        }
467    
468        /**
469         * @return Returns the randomize.
470         */
471        public boolean isRandomize() {
472            return randomize;
473        }
474    
475        /**
476         * @param randomize The randomize to set.
477         */
478        public void setRandomize(boolean randomize) {
479            this.randomize = randomize;
480        }
481    
482        public boolean isBackup() {
483            return backup;
484        }
485    
486        public void setBackup(boolean backup) {
487            this.backup = backup;
488        }
489    
490        public int getBackupPoolSize() {
491            return backupPoolSize;
492        }
493    
494        public void setBackupPoolSize(int backupPoolSize) {
495            this.backupPoolSize = backupPoolSize;
496        }
497    
498        public int getCurrentBackups() {
499            return this.backups.size();
500        }
501    
502        public boolean isTrackMessages() {
503            return trackMessages;
504        }
505    
506        public void setTrackMessages(boolean trackMessages) {
507            this.trackMessages = trackMessages;
508        }
509    
510        public boolean isTrackTransactionProducers() {
511            return this.trackTransactionProducers;
512        }
513    
514        public void setTrackTransactionProducers(boolean trackTransactionProducers) {
515            this.trackTransactionProducers = trackTransactionProducers;
516        }
517    
518        public int getMaxCacheSize() {
519            return maxCacheSize;
520        }
521    
522        public void setMaxCacheSize(int maxCacheSize) {
523            this.maxCacheSize = maxCacheSize;
524        }
525    
526        public boolean isPriorityBackup() {
527            return priorityBackup;
528        }
529    
530        public void setPriorityBackup(boolean priorityBackup) {
531            this.priorityBackup = priorityBackup;
532        }
533    
534        public void setPriorityURIs(String priorityURIs) {
535            StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
536            while (tokenizer.hasMoreTokens()) {
537                String str = tokenizer.nextToken();
538                try {
539                    URI uri = new URI(str);
540                    priorityList.add(uri);
541                } catch (Exception e) {
542                    LOG.error("Failed to parse broker address: " + str, e);
543                }
544            }
545        }
546    
547        @Override
548        public void oneway(Object o) throws IOException {
549    
550            Command command = (Command) o;
551            Exception error = null;
552            try {
553    
554                synchronized (reconnectMutex) {
555    
556                    if (command != null && connectedTransport.get() == null) {
557                        if (command.isShutdownInfo()) {
558                            // Skipping send of ShutdownInfo command when not connected.
559                            return;
560                        } else if (command instanceof RemoveInfo || command.isMessageAck()) {
561                            // Simulate response to RemoveInfo command or MessageAck (as it will be stale)
562                            stateTracker.track(command);
563                            if (command.isResponseRequired()) {
564                                Response response = new Response();
565                                response.setCorrelationId(command.getCommandId());
566                                myTransportListener.onCommand(response);
567                            }
568                            return;
569                        } else if (command instanceof MessagePull) {
570                            // Simulate response to MessagePull if timed as we can't honor that now.
571                            MessagePull pullRequest = (MessagePull) command;
572                            if (pullRequest.getTimeout() != 0) {
573                                MessageDispatch dispatch = new MessageDispatch();
574                                dispatch.setConsumerId(pullRequest.getConsumerId());
575                                dispatch.setDestination(pullRequest.getDestination());
576                                myTransportListener.onCommand(dispatch);
577                            }
578                            return;
579                        }
580                    }
581    
582                    // Keep trying until the message is sent.
583                    for (int i = 0; !disposed; i++) {
584                        try {
585    
586                            // Wait for transport to be connected.
587                            Transport transport = connectedTransport.get();
588                            long start = System.currentTimeMillis();
589                            boolean timedout = false;
590                            while (transport == null && !disposed && connectionFailure == null
591                                    && !Thread.currentThread().isInterrupted()) {
592                                if (LOG.isTraceEnabled()) {
593                                    LOG.trace("Waiting for transport to reconnect..: " + command);
594                                }
595                                long end = System.currentTimeMillis();
596                                if (timeout > 0 && (end - start > timeout)) {
597                                    timedout = true;
598                                    if (LOG.isInfoEnabled()) {
599                                        LOG.info("Failover timed out after " + (end - start) + "ms");
600                                    }
601                                    break;
602                                }
603                                try {
604                                    reconnectMutex.wait(100);
605                                } catch (InterruptedException e) {
606                                    Thread.currentThread().interrupt();
607                                    if (LOG.isDebugEnabled()) {
608                                        LOG.debug("Interupted: " + e, e);
609                                    }
610                                }
611                                transport = connectedTransport.get();
612                            }
613    
614                            if (transport == null) {
615                                // Previous loop may have exited due to use being
616                                // disposed.
617                                if (disposed) {
618                                    error = new IOException("Transport disposed.");
619                                } else if (connectionFailure != null) {
620                                    error = connectionFailure;
621                                } else if (timedout == true) {
622                                    error = new IOException("Failover timeout of " + timeout + " ms reached.");
623                                } else {
624                                    error = new IOException("Unexpected failure.");
625                                }
626                                break;
627                            }
628    
629                            // If it was a request and it was not being tracked by
630                            // the state tracker,
631                            // then hold it in the requestMap so that we can replay
632                            // it later.
633                            Tracked tracked = stateTracker.track(command);
634                            synchronized (requestMap) {
635                                if (tracked != null && tracked.isWaitingForResponse()) {
636                                    requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
637                                } else if (tracked == null && command.isResponseRequired()) {
638                                    requestMap.put(Integer.valueOf(command.getCommandId()), command);
639                                }
640                            }
641    
642                            // Send the message.
643                            try {
644                                transport.oneway(command);
645                                stateTracker.trackBack(command);
646                            } catch (IOException e) {
647    
648                                // If the command was not tracked.. we will retry in
649                                // this method
650                                if (tracked == null) {
651    
652                                    // since we will retry in this method.. take it
653                                    // out of the request
654                                    // map so that it is not sent 2 times on
655                                    // recovery
656                                    if (command.isResponseRequired()) {
657                                        requestMap.remove(Integer.valueOf(command.getCommandId()));
658                                    }
659    
660                                    // Rethrow the exception so it will handled by
661                                    // the outer catch
662                                    throw e;
663                                } else {
664                                    // Handle the error but allow the method to return since the
665                                    // tracked commands are replayed on reconnect.
666                                    if (LOG.isDebugEnabled()) {
667                                        LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
668                                    }
669                                    handleTransportFailure(e);
670                                }
671                            }
672    
673                            return;
674    
675                        } catch (IOException e) {
676                            if (LOG.isDebugEnabled()) {
677                                LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
678                            }
679                            handleTransportFailure(e);
680                        }
681                    }
682                }
683            } catch (InterruptedException e) {
684                // Some one may be trying to stop our thread.
685                Thread.currentThread().interrupt();
686                throw new InterruptedIOException();
687            }
688    
689            if (!disposed) {
690                if (error != null) {
691                    if (error instanceof IOException) {
692                        throw (IOException) error;
693                    }
694                    throw IOExceptionSupport.create(error);
695                }
696            }
697        }
698    
699        @Override
700        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
701            throw new AssertionError("Unsupported Method");
702        }
703    
704        @Override
705        public Object request(Object command) throws IOException {
706            throw new AssertionError("Unsupported Method");
707        }
708    
709        @Override
710        public Object request(Object command, int timeout) throws IOException {
711            throw new AssertionError("Unsupported Method");
712        }
713    
714        @Override
715        public void add(boolean rebalance, URI u[]) {
716            boolean newURI = false;
717            for (URI uri : u) {
718                if (!contains(uri)) {
719                    uris.add(uri);
720                    newURI = true;
721                }
722            }
723            if (newURI) {
724                reconnect(rebalance);
725            }
726        }
727    
728        @Override
729        public void remove(boolean rebalance, URI u[]) {
730            for (URI uri : u) {
731                uris.remove(uri);
732            }
733            // rebalance is automatic if any connected to removed/stopped broker
734        }
735    
736        public void add(boolean rebalance, String u) {
737            try {
738                URI newURI = new URI(u);
739                if (contains(newURI) == false) {
740                    uris.add(newURI);
741                    reconnect(rebalance);
742                }
743    
744            } catch (Exception e) {
745                LOG.error("Failed to parse URI: " + u);
746            }
747        }
748    
749        public void reconnect(boolean rebalance) {
750            synchronized (reconnectMutex) {
751                if (started) {
752                    if (rebalance) {
753                        doRebalance = true;
754                    }
755                    LOG.debug("Waking up reconnect task");
756                    try {
757                        reconnectTask.wakeup();
758                    } catch (InterruptedException e) {
759                        Thread.currentThread().interrupt();
760                    }
761                } else {
762                    LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
763                }
764            }
765        }
766    
767        private List<URI> getConnectList() {
768            if (!updated.isEmpty()) {
769                return updated;
770            }
771            ArrayList<URI> l = new ArrayList<URI>(uris);
772            boolean removed = false;
773            if (failedConnectTransportURI != null) {
774                removed = l.remove(failedConnectTransportURI);
775            }
776            if (randomize) {
777                // Randomly, reorder the list by random swapping
778                for (int i = 0; i < l.size(); i++) {
779                    int p = (int) (Math.random() * 100 % l.size());
780                    URI t = l.get(p);
781                    l.set(p, l.get(i));
782                    l.set(i, t);
783                }
784            }
785            if (removed) {
786                l.add(failedConnectTransportURI);
787            }
788            if (LOG.isDebugEnabled()) {
789                LOG.debug("urlList connectionList:" + l + ", from: " + uris);
790            }
791            return l;
792        }
793    
794        @Override
795        public TransportListener getTransportListener() {
796            return transportListener;
797        }
798    
799        @Override
800        public void setTransportListener(TransportListener commandListener) {
801            synchronized (listenerMutex) {
802                this.transportListener = commandListener;
803                listenerMutex.notifyAll();
804            }
805        }
806    
807        @Override
808        public <T> T narrow(Class<T> target) {
809    
810            if (target.isAssignableFrom(getClass())) {
811                return target.cast(this);
812            }
813            Transport transport = connectedTransport.get();
814            if (transport != null) {
815                return transport.narrow(target);
816            }
817            return null;
818    
819        }
820    
821        protected void restoreTransport(Transport t) throws Exception, IOException {
822            t.start();
823            // send information to the broker - informing it we are an ft client
824            ConnectionControl cc = new ConnectionControl();
825            cc.setFaultTolerant(true);
826            t.oneway(cc);
827            stateTracker.restore(t);
828            Map<Integer, Command> tmpMap = null;
829            synchronized (requestMap) {
830                tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
831            }
832            for (Command command : tmpMap.values()) {
833                if (LOG.isTraceEnabled()) {
834                    LOG.trace("restore requestMap, replay: " + command);
835                }
836                t.oneway(command);
837            }
838        }
839    
840        public boolean isUseExponentialBackOff() {
841            return useExponentialBackOff;
842        }
843    
844        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
845            this.useExponentialBackOff = useExponentialBackOff;
846        }
847    
848        @Override
849        public String toString() {
850            return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
851        }
852    
853        @Override
854        public String getRemoteAddress() {
855            Transport transport = connectedTransport.get();
856            if (transport != null) {
857                return transport.getRemoteAddress();
858            }
859            return null;
860        }
861    
862        @Override
863        public boolean isFaultTolerant() {
864            return true;
865        }
866    
867        private void doUpdateURIsFromDisk() {
868            // If updateURIsURL is specified, read the file and add any new
869            // transport URI's to this FailOverTransport.
870            // Note: Could track file timestamp to avoid unnecessary reading.
871            String fileURL = getUpdateURIsURL();
872            if (fileURL != null) {
873                BufferedReader in = null;
874                String newUris = null;
875                StringBuffer buffer = new StringBuffer();
876    
877                try {
878                    in = new BufferedReader(getURLStream(fileURL));
879                    while (true) {
880                        String line = in.readLine();
881                        if (line == null) {
882                            break;
883                        }
884                        buffer.append(line);
885                    }
886                    newUris = buffer.toString();
887                } catch (IOException ioe) {
888                    LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
889                } finally {
890                    if (in != null) {
891                        try {
892                            in.close();
893                        } catch (IOException ioe) {
894                            // ignore
895                        }
896                    }
897                }
898    
899                processNewTransports(isRebalanceUpdateURIs(), newUris);
900            }
901        }
902    
903        final boolean doReconnect() {
904            Exception failure = null;
905            synchronized (reconnectMutex) {
906    
907                // First ensure we are up to date.
908                doUpdateURIsFromDisk();
909    
910                if (disposed || connectionFailure != null) {
911                    reconnectMutex.notifyAll();
912                }
913                if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
914                    return false;
915                } else {
916                    List<URI> connectList = getConnectList();
917                    if (connectList.isEmpty()) {
918                        failure = new IOException("No uris available to connect to.");
919                    } else {
920                        if (doRebalance) {
921                            if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) {
922                                // already connected to first in the list, no need to rebalance
923                                doRebalance = false;
924                                return false;
925                            } else {
926                                if (LOG.isDebugEnabled()) {
927                                    LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
928                                }
929    
930                                try {
931                                    Transport transport = this.connectedTransport.getAndSet(null);
932                                    if (transport != null) {
933                                        disposeTransport(transport);
934                                    }
935                                } catch (Exception e) {
936                                    if (LOG.isDebugEnabled()) {
937                                        LOG.debug("Caught an exception stopping existing transport for rebalance", e);
938                                    }
939                                }
940                            }
941                            doRebalance = false;
942                        }
943    
944                        resetReconnectDelay();
945    
946                        Transport transport = null;
947                        URI uri = null;
948    
949                        // If we have a backup already waiting lets try it.
950                        synchronized (backupMutex) {
951                            if ((priorityBackup || backup) && !backups.isEmpty()) {
952                                ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups);
953                                if (randomize) {
954                                    Collections.shuffle(l);
955                                }
956                                BackupTransport bt = l.remove(0);
957                                backups.remove(bt);
958                                transport = bt.getTransport();
959                                uri = bt.getUri();
960                                if (priorityBackup && priorityBackupAvailable) {
961                                    Transport old = this.connectedTransport.getAndSet(null);
962                                    if (old != null) {
963                                        disposeTransport(old);
964                                    }
965                                    priorityBackupAvailable = false;
966                                }
967                            }
968                        }
969    
970                        // Sleep for the reconnectDelay if there's no backup and we aren't trying
971                        // for the first time, or we were disposed for some reason.
972                        if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) {
973                            synchronized (sleepMutex) {
974                                if (LOG.isDebugEnabled()) {
975                                    LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
976                                }
977                                try {
978                                    sleepMutex.wait(reconnectDelay);
979                                } catch (InterruptedException e) {
980                                    Thread.currentThread().interrupt();
981                                }
982                            }
983                        }
984    
985                        Iterator<URI> iter = connectList.iterator();
986                        while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
987    
988                            try {
989                                SslContext.setCurrentSslContext(brokerSslContext);
990    
991                                // We could be starting with a backup and if so we wait to grab a
992                                // URI from the pool until next time around.
993                                if (transport == null) {
994                                    uri = addExtraQueryOptions(iter.next());
995                                    transport = TransportFactory.compositeConnect(uri);
996                                }
997    
998                                if (LOG.isDebugEnabled()) {
999                                    LOG.debug("Attempting  " + connectFailures + "th  connect to: " + uri);
1000                                }
1001                                transport.setTransportListener(myTransportListener);
1002                                transport.start();
1003    
1004                                if (started &&  !firstConnection) {
1005                                    restoreTransport(transport);
1006                                }
1007    
1008                                if (LOG.isDebugEnabled()) {
1009                                    LOG.debug("Connection established");
1010                                }
1011                                reconnectDelay = initialReconnectDelay;
1012                                connectedTransportURI = uri;
1013                                connectedTransport.set(transport);
1014                                connectedToPriority = isPriority(connectedTransportURI);
1015                                reconnectMutex.notifyAll();
1016                                connectFailures = 0;
1017    
1018                                // Make sure on initial startup, that the transportListener
1019                                // has been initialized for this instance.
1020                                synchronized (listenerMutex) {
1021                                    if (transportListener == null) {
1022                                        try {
1023                                            // if it isn't set after 2secs - it probably never will be
1024                                            listenerMutex.wait(2000);
1025                                        } catch (InterruptedException ex) {
1026                                        }
1027                                    }
1028                                }
1029    
1030                                if (transportListener != null) {
1031                                    transportListener.transportResumed();
1032                                } else {
1033                                    if (LOG.isDebugEnabled()) {
1034                                        LOG.debug("transport resumed by transport listener not set");
1035                                    }
1036                                }
1037    
1038                                if (firstConnection) {
1039                                    firstConnection = false;
1040                                    LOG.info("Successfully connected to " + uri);
1041                                } else {
1042                                    LOG.info("Successfully reconnected to " + uri);
1043                                }
1044    
1045                                connected = true;
1046                                return false;
1047                            } catch (Exception e) {
1048                                failure = e;
1049                                if (LOG.isDebugEnabled()) {
1050                                    LOG.debug("Connect fail to: " + uri + ", reason: " + e);
1051                                }
1052                                if (transport != null) {
1053                                    try {
1054                                        transport.stop();
1055                                        transport = null;
1056                                    } catch (Exception ee) {
1057                                        if (LOG.isDebugEnabled()) {
1058                                            LOG.debug("Stop of failed transport: " + transport +
1059                                                      " failed with reason: " + ee);
1060                                        }
1061                                    }
1062                                }
1063                            } finally {
1064                                SslContext.setCurrentSslContext(null);
1065                            }
1066                        }
1067                    }
1068                }
1069    
1070                int reconnectLimit = calculateReconnectAttemptLimit();
1071    
1072                connectFailures++;
1073                if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
1074                    LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
1075                    connectionFailure = failure;
1076    
1077                    // Make sure on initial startup, that the transportListener has been
1078                    // initialized for this instance.
1079                    synchronized (listenerMutex) {
1080                        if (transportListener == null) {
1081                            try {
1082                                listenerMutex.wait(2000);
1083                            } catch (InterruptedException ex) {
1084                            }
1085                        }
1086                    }
1087    
1088                    propagateFailureToExceptionListener(connectionFailure);
1089                    return false;
1090                }
1091            }
1092    
1093            if (!disposed) {
1094                doDelay();
1095            }
1096    
1097            return !disposed;
1098        }
1099    
1100        private void doDelay() {
1101            if (reconnectDelay > 0) {
1102                synchronized (sleepMutex) {
1103                    if (LOG.isDebugEnabled()) {
1104                        LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
1105                    }
1106                    try {
1107                        sleepMutex.wait(reconnectDelay);
1108                    } catch (InterruptedException e) {
1109                        Thread.currentThread().interrupt();
1110                    }
1111                }
1112            }
1113    
1114            if (useExponentialBackOff) {
1115                // Exponential increment of reconnect delay.
1116                reconnectDelay *= backOffMultiplier;
1117                if (reconnectDelay > maxReconnectDelay) {
1118                    reconnectDelay = maxReconnectDelay;
1119                }
1120            }
1121        }
1122    
1123        private void resetReconnectDelay() {
1124            if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
1125                reconnectDelay = initialReconnectDelay;
1126            }
1127        }
1128    
1129        /*
1130          * called with reconnectMutex held
1131         */
1132        private void propagateFailureToExceptionListener(Exception exception) {
1133            if (transportListener != null) {
1134                if (exception instanceof IOException) {
1135                    transportListener.onException((IOException)exception);
1136                } else {
1137                    transportListener.onException(IOExceptionSupport.create(exception));
1138                }
1139            }
1140            reconnectMutex.notifyAll();
1141        }
1142    
1143        private int calculateReconnectAttemptLimit() {
1144            int maxReconnectValue = this.maxReconnectAttempts;
1145            if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {
1146                maxReconnectValue = this.startupMaxReconnectAttempts;
1147            }
1148            return maxReconnectValue;
1149        }
1150    
1151        private boolean shouldBuildBackups() {
1152           return (backup && backups.size() < backupPoolSize) || (priorityBackup && !(priorityBackupAvailable || connectedToPriority));
1153        }
1154    
1155        final boolean buildBackups() {
1156            synchronized (backupMutex) {
1157                if (!disposed && shouldBuildBackups()) {
1158                    ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
1159                    List<URI> connectList = getConnectList();
1160                    for (URI uri: connectList) {
1161                        if (!backupList.contains(uri)) {
1162                            backupList.add(uri);
1163                        }
1164                    }
1165                    // removed disposed backups
1166                    List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
1167                    for (BackupTransport bt : backups) {
1168                        if (bt.isDisposed()) {
1169                            disposedList.add(bt);
1170                        }
1171                    }
1172                    backups.removeAll(disposedList);
1173                    disposedList.clear();
1174                    for (Iterator<URI> iter = backupList.iterator(); !disposed && iter.hasNext() && shouldBuildBackups(); ) {
1175                        URI uri = addExtraQueryOptions(iter.next());
1176                        if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
1177                            try {
1178                                SslContext.setCurrentSslContext(brokerSslContext);
1179                                BackupTransport bt = new BackupTransport(this);
1180                                bt.setUri(uri);
1181                                if (!backups.contains(bt)) {
1182                                    Transport t = TransportFactory.compositeConnect(uri);
1183                                    t.setTransportListener(bt);
1184                                    t.start();
1185                                    bt.setTransport(t);
1186                                    if (priorityBackup && isPriority(uri)) {
1187                                       priorityBackupAvailable = true;
1188                                       backups.add(0, bt);
1189                                       // if this priority backup overflows the pool
1190                                       // remove the backup with the lowest priority
1191                                       if (backups.size() > backupPoolSize) {
1192                                           BackupTransport disposeTransport = backups.remove(backups.size() - 1);
1193                                           disposeTransport.setDisposed(true);
1194                                           Transport transport = disposeTransport.getTransport();
1195                                           if (transport != null) {
1196                                               transport.setTransportListener(disposedListener);
1197                                               disposeTransport(transport);
1198                                           }
1199                                       }
1200                                    } else {
1201                                        backups.add(bt);
1202                                    }
1203                                }
1204                            } catch (Exception e) {
1205                                LOG.debug("Failed to build backup ", e);
1206                            } finally {
1207                                SslContext.setCurrentSslContext(null);
1208                            }
1209                        }
1210                    }
1211                }
1212            }
1213            return false;
1214        }
1215    
1216        protected boolean isPriority(URI uri) {
1217            if (!priorityBackup) {
1218                return false;
1219            }
1220    
1221            if (!priorityList.isEmpty()) {
1222                return priorityList.contains(uri);
1223            }
1224            return uris.indexOf(uri) == 0;
1225        }
1226    
1227        @Override
1228        public boolean isDisposed() {
1229            return disposed;
1230        }
1231    
1232        @Override
1233        public boolean isConnected() {
1234            return connected;
1235        }
1236    
1237        @Override
1238        public void reconnect(URI uri) throws IOException {
1239            add(true, new URI[]{uri});
1240        }
1241    
1242        @Override
1243        public boolean isReconnectSupported() {
1244            return this.reconnectSupported;
1245        }
1246    
1247        public void setReconnectSupported(boolean value) {
1248            this.reconnectSupported = value;
1249        }
1250    
1251        @Override
1252        public boolean isUpdateURIsSupported() {
1253            return this.updateURIsSupported;
1254        }
1255    
1256        public void setUpdateURIsSupported(boolean value) {
1257            this.updateURIsSupported = value;
1258        }
1259    
1260        @Override
1261        public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
1262            if (isUpdateURIsSupported()) {
1263                HashSet<URI> copy = new HashSet<URI>(this.updated);
1264                updated.clear();
1265                if (updatedURIs != null && updatedURIs.length > 0) {
1266                    for (URI uri : updatedURIs) {
1267                        if (uri != null && !updated.contains(uri)) {
1268                            updated.add(uri);
1269                        }
1270                    }
1271                    if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) {
1272                        buildBackups();
1273                        synchronized (reconnectMutex) {
1274                            reconnect(rebalance);
1275                        }
1276                    }
1277                }
1278            }
1279        }
1280    
1281        /**
1282         * @return the updateURIsURL
1283         */
1284        public String getUpdateURIsURL() {
1285            return this.updateURIsURL;
1286        }
1287    
1288        /**
1289         * @param updateURIsURL the updateURIsURL to set
1290         */
1291        public void setUpdateURIsURL(String updateURIsURL) {
1292            this.updateURIsURL = updateURIsURL;
1293        }
1294    
1295        /**
1296         * @return the rebalanceUpdateURIs
1297         */
1298        public boolean isRebalanceUpdateURIs() {
1299            return this.rebalanceUpdateURIs;
1300        }
1301    
1302        /**
1303         * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
1304         */
1305        public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
1306            this.rebalanceUpdateURIs = rebalanceUpdateURIs;
1307        }
1308    
1309        @Override
1310        public int getReceiveCounter() {
1311            Transport transport = connectedTransport.get();
1312            if (transport == null) {
1313                return 0;
1314            }
1315            return transport.getReceiveCounter();
1316        }
1317    
1318        public int getConnectFailures() {
1319            return connectFailures;
1320        }
1321    
1322        public void connectionInterruptProcessingComplete(ConnectionId connectionId) {
1323            synchronized (reconnectMutex) {
1324                stateTracker.connectionInterruptProcessingComplete(this, connectionId);
1325            }
1326        }
1327    
1328        public ConnectionStateTracker getStateTracker() {
1329            return stateTracker;
1330        }
1331    
1332        private boolean contains(URI newURI) {
1333            boolean result = false;
1334            for (URI uri : uris) {
1335                if (compareURIs(newURI, uri)) {
1336                    result = true;
1337                    break;
1338                }
1339            }
1340    
1341            return result;
1342        }
1343    
1344        private boolean compareURIs(final URI first, final URI second) {
1345    
1346            boolean result = false;
1347            if (first == null || second == null) {
1348                return result;
1349            }
1350    
1351            if (first.getPort() == second.getPort()) {
1352                InetAddress firstAddr = null;
1353                InetAddress secondAddr = null;
1354                try {
1355                    firstAddr = InetAddress.getByName(first.getHost());
1356                    secondAddr = InetAddress.getByName(second.getHost());
1357    
1358                    if (firstAddr.equals(secondAddr)) {
1359                        result = true;
1360                    }
1361    
1362                } catch(IOException e) {
1363    
1364                    if (firstAddr == null) {
1365                        LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e);
1366                    } else {
1367                        LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e);
1368                    }
1369    
1370                    if (first.getHost().equalsIgnoreCase(second.getHost())) {
1371                        result = true;
1372                    }
1373                }
1374            }
1375    
1376            return result;
1377        }
1378    
1379        private InputStreamReader getURLStream(String path) throws IOException {
1380            InputStreamReader result = null;
1381            URL url = null;
1382            try {
1383                url = new URL(path);
1384                result = new InputStreamReader(url.openStream());
1385            } catch (MalformedURLException e) {
1386                // ignore - it could be a path to a a local file
1387            }
1388            if (result == null) {
1389                result = new FileReader(path);
1390            }
1391            return result;
1392        }
1393    
1394        private URI addExtraQueryOptions(URI uri) {
1395            try {
1396                if( nestedExtraQueryOptions!=null && !nestedExtraQueryOptions.isEmpty() ) {
1397                    if( uri.getQuery() == null ) {
1398                        uri = URISupport.createURIWithQuery(uri, nestedExtraQueryOptions);
1399                    } else {
1400                        uri = URISupport.createURIWithQuery(uri, uri.getQuery()+"&"+nestedExtraQueryOptions);
1401                    }
1402                }
1403            } catch (URISyntaxException e) {
1404                throw new RuntimeException(e);
1405            }
1406            return uri;
1407        }
1408    
1409        public void setNestedExtraQueryOptions(String nestedExtraQueryOptions) {
1410            this.nestedExtraQueryOptions = nestedExtraQueryOptions;
1411        }
1412    
1413    }