001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.failover;
018
019import java.io.BufferedReader;
020import java.io.FileReader;
021import java.io.IOException;
022import java.io.InputStreamReader;
023import java.io.InterruptedIOException;
024import java.net.InetAddress;
025import java.net.MalformedURLException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.URL;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.HashSet;
032import java.util.Iterator;
033import java.util.LinkedHashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.StringTokenizer;
037import java.util.concurrent.CopyOnWriteArrayList;
038import java.util.concurrent.atomic.AtomicReference;
039
040import org.apache.activemq.broker.SslContext;
041import org.apache.activemq.command.Command;
042import org.apache.activemq.command.ConnectionControl;
043import org.apache.activemq.command.ConsumerControl;
044import org.apache.activemq.command.ConnectionId;
045import org.apache.activemq.command.MessageDispatch;
046import org.apache.activemq.command.MessagePull;
047import org.apache.activemq.command.RemoveInfo;
048import org.apache.activemq.command.Response;
049
050import org.apache.activemq.state.ConnectionStateTracker;
051import org.apache.activemq.state.Tracked;
052import org.apache.activemq.thread.Task;
053import org.apache.activemq.thread.TaskRunner;
054import org.apache.activemq.thread.TaskRunnerFactory;
055import org.apache.activemq.transport.CompositeTransport;
056import org.apache.activemq.transport.DefaultTransportListener;
057import org.apache.activemq.transport.FutureResponse;
058import org.apache.activemq.transport.ResponseCallback;
059import org.apache.activemq.transport.Transport;
060import org.apache.activemq.transport.TransportFactory;
061import org.apache.activemq.transport.TransportListener;
062import org.apache.activemq.util.IOExceptionSupport;
063import org.apache.activemq.util.ServiceSupport;
064import org.apache.activemq.util.URISupport;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068/**
069 * A Transport that is made reliable by being able to fail over to another
070 * transport when a transport failure is detected.
071 */
072public class FailoverTransport implements CompositeTransport {
073
074    private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
075    private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
076    private static final int INFINITE = -1;
077    private TransportListener transportListener;
078    private boolean disposed;
079    private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
080    private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
081
082    private final Object reconnectMutex = new Object();
083    private final Object backupMutex = new Object();
084    private final Object sleepMutex = new Object();
085    private final Object listenerMutex = new Object();
086    private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
087    private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
088
089    private URI connectedTransportURI;
090    private URI failedConnectTransportURI;
091    private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
092    private final TaskRunnerFactory reconnectTaskFactory;
093    private final TaskRunner reconnectTask;
094    private boolean started;
095    private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
096    private long maxReconnectDelay = 1000 * 30;
097    private double backOffMultiplier = 2d;
098    private long timeout = INFINITE;
099    private boolean useExponentialBackOff = true;
100    private boolean randomize = true;
101    private int maxReconnectAttempts = INFINITE;
102    private int startupMaxReconnectAttempts = INFINITE;
103    private int connectFailures;
104    private int warnAfterReconnectAttempts = 10;
105    private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
106    private Exception connectionFailure;
107    private boolean firstConnection = true;
108    // optionally always have a backup created
109    private boolean backup = false;
110    private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
111    private int backupPoolSize = 1;
112    private boolean trackMessages = false;
113    private boolean trackTransactionProducers = true;
114    private int maxCacheSize = 128 * 1024;
115    private final TransportListener disposedListener = new DefaultTransportListener() {
116    };
117    private final TransportListener myTransportListener = createTransportListener();
118    private boolean updateURIsSupported = true;
119    private boolean reconnectSupported = true;
120    // remember for reconnect thread
121    private SslContext brokerSslContext;
122    private String updateURIsURL = null;
123    private boolean rebalanceUpdateURIs = true;
124    private boolean doRebalance = false;
125    private boolean connectedToPriority = false;
126
127    private boolean priorityBackup = false;
128    private final ArrayList<URI> priorityList = new ArrayList<URI>();
129    private boolean priorityBackupAvailable = false;
130    private String nestedExtraQueryOptions;
131    private boolean shuttingDown = false;
132
133    public FailoverTransport() {
134        brokerSslContext = SslContext.getCurrentSslContext();
135        stateTracker.setTrackTransactions(true);
136        // Setup a task that is used to reconnect the a connection async.
137        reconnectTaskFactory = new TaskRunnerFactory();
138        reconnectTaskFactory.init();
139        reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
140            @Override
141            public boolean iterate() {
142                boolean result = false;
143                if (!started) {
144                    return result;
145                }
146                boolean buildBackup = true;
147                synchronized (backupMutex) {
148                    if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
149                        result = doReconnect();
150                        buildBackup = false;
151                    }
152                }
153                if (buildBackup) {
154                    buildBackups();
155                    if (priorityBackup && !connectedToPriority) {
156                        try {
157                            doDelay();
158                            if (reconnectTask == null) {
159                                return true;
160                            }
161                            reconnectTask.wakeup();
162                        } catch (InterruptedException e) {
163                            LOG.debug("Reconnect task has been interrupted.", e);
164                        }
165                    }
166                } else {
167                    // build backups on the next iteration
168                    buildBackup = true;
169                    try {
170                        if (reconnectTask == null) {
171                            return true;
172                        }
173                        reconnectTask.wakeup();
174                    } catch (InterruptedException e) {
175                        LOG.debug("Reconnect task has been interrupted.", e);
176                    }
177                }
178                return result;
179            }
180
181        }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
182    }
183
184    TransportListener createTransportListener() {
185        return new TransportListener() {
186            @Override
187            public void onCommand(Object o) {
188                Command command = (Command) o;
189                if (command == null) {
190                    return;
191                }
192                if (command.isResponse()) {
193                    Object object = null;
194                    synchronized (requestMap) {
195                        object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
196                    }
197                    if (object != null && object.getClass() == Tracked.class) {
198                        ((Tracked) object).onResponses(command);
199                    }
200                }
201
202                if (command.isConnectionControl()) {
203                    handleConnectionControl((ConnectionControl) command);
204                }
205                else if (command.isConsumerControl()) {
206                    ConsumerControl consumerControl = (ConsumerControl)command;
207                    if (consumerControl.isClose()) {
208                        stateTracker.processRemoveConsumer(consumerControl.getConsumerId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
209                    }
210
211                }
212                if (transportListener != null) {
213                    transportListener.onCommand(command);
214                }
215            }
216
217            @Override
218            public void onException(IOException error) {
219                try {
220                    handleTransportFailure(error);
221                } catch (InterruptedException e) {
222                    Thread.currentThread().interrupt();
223                    transportListener.onException(new InterruptedIOException());
224                }
225            }
226
227            @Override
228            public void transportInterupted() {
229                if (transportListener != null) {
230                    transportListener.transportInterupted();
231                }
232            }
233
234            @Override
235            public void transportResumed() {
236                if (transportListener != null) {
237                    transportListener.transportResumed();
238                }
239            }
240        };
241    }
242
243    public final void disposeTransport(Transport transport) {
244        transport.setTransportListener(disposedListener);
245        ServiceSupport.dispose(transport);
246    }
247
248    public final void handleTransportFailure(IOException e) throws InterruptedException {
249        if (shuttingDown) {
250            // shutdown info sent and remote socket closed and we see that before a local close
251            // let the close do the work
252            return;
253        }
254
255        if (LOG.isTraceEnabled()) {
256            LOG.trace(this + " handleTransportFailure: " + e, e);
257        }
258
259        // could be blocked in write with the reconnectMutex held, but still needs to be whacked
260        Transport transport = connectedTransport.getAndSet(null);
261        if (transport != null) {
262            disposeTransport(transport);
263        }
264
265        synchronized (reconnectMutex) {
266            if (transport != null && connectedTransport.get() == null) {
267
268                boolean reconnectOk = false;
269
270                if (canReconnect()) {
271                    reconnectOk = true;
272                }
273                 LOG.warn("Transport (" + connectedTransportURI + ") failed"
274                        + (reconnectOk ? "," : ", not") + " attempting to automatically reconnect", e);
275
276                failedConnectTransportURI = connectedTransportURI;
277                connectedTransportURI = null;
278                connectedToPriority = false;
279
280                if (reconnectOk) {
281                    // notify before any reconnect attempt so ack state can be whacked
282                    if (transportListener != null) {
283                        transportListener.transportInterupted();
284                    }
285
286                    updated.remove(failedConnectTransportURI);
287                    reconnectTask.wakeup();
288                } else if (!isDisposed()) {
289                    propagateFailureToExceptionListener(e);
290                }
291            }
292        }
293    }
294
295    private boolean canReconnect() {
296        return started && 0 != calculateReconnectAttemptLimit();
297    }
298
299    public final void handleConnectionControl(ConnectionControl control) {
300        String reconnectStr = control.getReconnectTo();
301        if (LOG.isTraceEnabled()) {
302            LOG.trace("Received ConnectionControl: {}", control);
303        }
304
305        if (reconnectStr != null) {
306            reconnectStr = reconnectStr.trim();
307            if (reconnectStr.length() > 0) {
308                try {
309                    URI uri = new URI(reconnectStr);
310                    if (isReconnectSupported()) {
311                        reconnect(uri);
312                        LOG.info("Reconnected to: " + uri);
313                    }
314                } catch (Exception e) {
315                    LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
316                }
317            }
318        }
319        processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
320    }
321
322    private final void processNewTransports(boolean rebalance, String newTransports) {
323        if (newTransports != null) {
324            newTransports = newTransports.trim();
325            if (newTransports.length() > 0 && isUpdateURIsSupported()) {
326                List<URI> list = new ArrayList<URI>();
327                StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
328                while (tokenizer.hasMoreTokens()) {
329                    String str = tokenizer.nextToken();
330                    try {
331                        URI uri = new URI(str);
332                        list.add(uri);
333                    } catch (Exception e) {
334                        LOG.error("Failed to parse broker address: " + str, e);
335                    }
336                }
337                if (list.isEmpty() == false) {
338                    try {
339                        updateURIs(rebalance, list.toArray(new URI[list.size()]));
340                    } catch (IOException e) {
341                        LOG.error("Failed to update transport URI's from: " + newTransports, e);
342                    }
343                }
344            }
345        }
346    }
347
348    @Override
349    public void start() throws Exception {
350        synchronized (reconnectMutex) {
351            if (LOG.isDebugEnabled()) {
352                LOG.debug("Started " + this);
353            }
354            if (started) {
355                return;
356            }
357            started = true;
358            stateTracker.setMaxCacheSize(getMaxCacheSize());
359            stateTracker.setTrackMessages(isTrackMessages());
360            stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
361            if (connectedTransport.get() != null) {
362                stateTracker.restore(connectedTransport.get());
363            } else {
364                reconnect(false);
365            }
366        }
367    }
368
369    @Override
370    public void stop() throws Exception {
371        Transport transportToStop = null;
372        List<Transport> backupsToStop = new ArrayList<Transport>(backups.size());
373
374        try {
375            synchronized (reconnectMutex) {
376                if (LOG.isDebugEnabled()) {
377                    LOG.debug("Stopped " + this);
378                }
379                if (!started) {
380                    return;
381                }
382                started = false;
383                disposed = true;
384
385                if (connectedTransport.get() != null) {
386                    transportToStop = connectedTransport.getAndSet(null);
387                }
388                reconnectMutex.notifyAll();
389            }
390            synchronized (sleepMutex) {
391                sleepMutex.notifyAll();
392            }
393        } finally {
394            reconnectTask.shutdown();
395            reconnectTaskFactory.shutdownNow();
396        }
397
398        synchronized(backupMutex) {
399            for (BackupTransport backup : backups) {
400                backup.setDisposed(true);
401                Transport transport = backup.getTransport();
402                if (transport != null) {
403                    transport.setTransportListener(disposedListener);
404                    backupsToStop.add(transport);
405                }
406            }
407            backups.clear();
408        }
409        for (Transport transport : backupsToStop) {
410            try {
411                if (LOG.isTraceEnabled()) {
412                    LOG.trace("Stopped backup: " + transport);
413                }
414                disposeTransport(transport);
415            } catch (Exception e) {
416            }
417        }
418        if (transportToStop != null) {
419            transportToStop.stop();
420        }
421    }
422
423    public long getInitialReconnectDelay() {
424        return initialReconnectDelay;
425    }
426
427    public void setInitialReconnectDelay(long initialReconnectDelay) {
428        this.initialReconnectDelay = initialReconnectDelay;
429    }
430
431    public long getMaxReconnectDelay() {
432        return maxReconnectDelay;
433    }
434
435    public void setMaxReconnectDelay(long maxReconnectDelay) {
436        this.maxReconnectDelay = maxReconnectDelay;
437    }
438
439    public long getReconnectDelay() {
440        return reconnectDelay;
441    }
442
443    public void setReconnectDelay(long reconnectDelay) {
444        this.reconnectDelay = reconnectDelay;
445    }
446
447    public double getReconnectDelayExponent() {
448        return backOffMultiplier;
449    }
450
451    public void setReconnectDelayExponent(double reconnectDelayExponent) {
452        this.backOffMultiplier = reconnectDelayExponent;
453    }
454
455    public Transport getConnectedTransport() {
456        return connectedTransport.get();
457    }
458
459    public URI getConnectedTransportURI() {
460        return connectedTransportURI;
461    }
462
463    public int getMaxReconnectAttempts() {
464        return maxReconnectAttempts;
465    }
466
467    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
468        this.maxReconnectAttempts = maxReconnectAttempts;
469    }
470
471    public int getStartupMaxReconnectAttempts() {
472        return this.startupMaxReconnectAttempts;
473    }
474
475    public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
476        this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
477    }
478
479    public long getTimeout() {
480        return timeout;
481    }
482
483    public void setTimeout(long timeout) {
484        this.timeout = timeout;
485    }
486
487    /**
488     * @return Returns the randomize.
489     */
490    public boolean isRandomize() {
491        return randomize;
492    }
493
494    /**
495     * @param randomize The randomize to set.
496     */
497    public void setRandomize(boolean randomize) {
498        this.randomize = randomize;
499    }
500
501    public boolean isBackup() {
502        return backup;
503    }
504
505    public void setBackup(boolean backup) {
506        this.backup = backup;
507    }
508
509    public int getBackupPoolSize() {
510        return backupPoolSize;
511    }
512
513    public void setBackupPoolSize(int backupPoolSize) {
514        this.backupPoolSize = backupPoolSize;
515    }
516
517    public int getCurrentBackups() {
518        return this.backups.size();
519    }
520
521    public boolean isTrackMessages() {
522        return trackMessages;
523    }
524
525    public void setTrackMessages(boolean trackMessages) {
526        this.trackMessages = trackMessages;
527    }
528
529    public boolean isTrackTransactionProducers() {
530        return this.trackTransactionProducers;
531    }
532
533    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
534        this.trackTransactionProducers = trackTransactionProducers;
535    }
536
537    public int getMaxCacheSize() {
538        return maxCacheSize;
539    }
540
541    public void setMaxCacheSize(int maxCacheSize) {
542        this.maxCacheSize = maxCacheSize;
543    }
544
545    public boolean isPriorityBackup() {
546        return priorityBackup;
547    }
548
549    public void setPriorityBackup(boolean priorityBackup) {
550        this.priorityBackup = priorityBackup;
551    }
552
553    public void setPriorityURIs(String priorityURIs) {
554        StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
555        while (tokenizer.hasMoreTokens()) {
556            String str = tokenizer.nextToken();
557            try {
558                URI uri = new URI(str);
559                priorityList.add(uri);
560            } catch (Exception e) {
561                LOG.error("Failed to parse broker address: " + str, e);
562            }
563        }
564    }
565
566    @Override
567    public void oneway(Object o) throws IOException {
568
569        Command command = (Command) o;
570        Exception error = null;
571        try {
572
573            synchronized (reconnectMutex) {
574
575                if (command != null && connectedTransport.get() == null) {
576                    if (command.isShutdownInfo()) {
577                        // Skipping send of ShutdownInfo command when not connected.
578                        return;
579                    } else if (command instanceof RemoveInfo || command.isMessageAck()) {
580                        // Simulate response to RemoveInfo command or MessageAck (as it will be stale)
581                        stateTracker.track(command);
582                        if (command.isResponseRequired()) {
583                            Response response = new Response();
584                            response.setCorrelationId(command.getCommandId());
585                            myTransportListener.onCommand(response);
586                        }
587                        return;
588                    } else if (command instanceof MessagePull) {
589                        // Simulate response to MessagePull if timed as we can't honor that now.
590                        MessagePull pullRequest = (MessagePull) command;
591                        if (pullRequest.getTimeout() != 0) {
592                            MessageDispatch dispatch = new MessageDispatch();
593                            dispatch.setConsumerId(pullRequest.getConsumerId());
594                            dispatch.setDestination(pullRequest.getDestination());
595                            myTransportListener.onCommand(dispatch);
596                        }
597                        return;
598                    }
599                }
600
601                // Keep trying until the message is sent.
602                for (int i = 0; !disposed; i++) {
603                    try {
604
605                        // Wait for transport to be connected.
606                        Transport transport = connectedTransport.get();
607                        long start = System.currentTimeMillis();
608                        boolean timedout = false;
609                        while (transport == null && !disposed && connectionFailure == null
610                                && !Thread.currentThread().isInterrupted() && willReconnect()) {
611                            if (LOG.isTraceEnabled()) {
612                                LOG.trace("Waiting for transport to reconnect..: " + command);
613                            }
614                            long end = System.currentTimeMillis();
615                            if (command.isMessage() && timeout > 0 && (end - start > timeout)) {
616                                timedout = true;
617                                if (LOG.isInfoEnabled()) {
618                                    LOG.info("Failover timed out after " + (end - start) + "ms");
619                                }
620                                break;
621                            }
622                            try {
623                                reconnectMutex.wait(100);
624                            } catch (InterruptedException e) {
625                                Thread.currentThread().interrupt();
626                                if (LOG.isDebugEnabled()) {
627                                    LOG.debug("Interupted: " + e, e);
628                                }
629                            }
630                            transport = connectedTransport.get();
631                        }
632
633                        if (transport == null) {
634                            // Previous loop may have exited due to use being
635                            // disposed.
636                            if (disposed) {
637                                error = new IOException("Transport disposed.");
638                            } else if (connectionFailure != null) {
639                                error = connectionFailure;
640                            } else if (timedout == true) {
641                                error = new IOException("Failover timeout of " + timeout + " ms reached.");
642                            } else if (!willReconnect()) {
643                                error = new IOException("Reconnect attempts of " + maxReconnectAttempts + " exceeded");
644                            } else {
645                                error = new IOException("Unexpected failure.");
646                            }
647                            break;
648                        }
649
650                        Tracked tracked = null;
651                        try {
652                            tracked = stateTracker.track(command);
653                        } catch (IOException ioe) {
654                            LOG.debug("Cannot track the command " + command, ioe);
655                        }
656                        // If it was a request and it was not being tracked by
657                        // the state tracker,
658                        // then hold it in the requestMap so that we can replay
659                        // it later.
660                        synchronized (requestMap) {
661                            if (tracked != null && tracked.isWaitingForResponse()) {
662                                requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
663                            } else if (tracked == null && command.isResponseRequired()) {
664                                requestMap.put(Integer.valueOf(command.getCommandId()), command);
665                            }
666                        }
667
668                        // Send the message.
669                        try {
670                            transport.oneway(command);
671                            stateTracker.trackBack(command);
672                            if (command.isShutdownInfo()) {
673                                shuttingDown = true;
674                            }
675                        } catch (IOException e) {
676
677                            // If the command was not tracked.. we will retry in
678                            // this method
679                            if (tracked == null && canReconnect()) {
680
681                                // since we will retry in this method.. take it
682                                // out of the request
683                                // map so that it is not sent 2 times on
684                                // recovery
685                                if (command.isResponseRequired()) {
686                                    requestMap.remove(Integer.valueOf(command.getCommandId()));
687                                }
688
689                                // Rethrow the exception so it will handled by
690                                // the outer catch
691                                throw e;
692                            } else {
693                                // Handle the error but allow the method to return since the
694                                // tracked commands are replayed on reconnect.
695                                if (LOG.isDebugEnabled()) {
696                                    LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
697                                }
698                                handleTransportFailure(e);
699                            }
700                        }
701
702                        return;
703
704                    } catch (IOException e) {
705                        if (LOG.isDebugEnabled()) {
706                            LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
707                        }
708                        handleTransportFailure(e);
709                    }
710                }
711            }
712        } catch (InterruptedException e) {
713            // Some one may be trying to stop our thread.
714            Thread.currentThread().interrupt();
715            throw new InterruptedIOException();
716        }
717
718        if (!disposed) {
719            if (error != null) {
720                if (error instanceof IOException) {
721                    throw (IOException) error;
722                }
723                throw IOExceptionSupport.create(error);
724            }
725        }
726    }
727
728    private boolean willReconnect() {
729        return firstConnection || 0 != calculateReconnectAttemptLimit();
730    }
731
732    @Override
733    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
734        throw new AssertionError("Unsupported Method");
735    }
736
737    @Override
738    public Object request(Object command) throws IOException {
739        throw new AssertionError("Unsupported Method");
740    }
741
742    @Override
743    public Object request(Object command, int timeout) throws IOException {
744        throw new AssertionError("Unsupported Method");
745    }
746
747    @Override
748    public void add(boolean rebalance, URI u[]) {
749        boolean newURI = false;
750        for (URI uri : u) {
751            if (!contains(uri)) {
752                uris.add(uri);
753                newURI = true;
754            }
755        }
756        if (newURI) {
757            reconnect(rebalance);
758        }
759    }
760
761    @Override
762    public void remove(boolean rebalance, URI u[]) {
763        for (URI uri : u) {
764            uris.remove(uri);
765        }
766        // rebalance is automatic if any connected to removed/stopped broker
767    }
768
769    public void add(boolean rebalance, String u) {
770        try {
771            URI newURI = new URI(u);
772            if (contains(newURI) == false) {
773                uris.add(newURI);
774                reconnect(rebalance);
775            }
776
777        } catch (Exception e) {
778            LOG.error("Failed to parse URI: " + u);
779        }
780    }
781
782    public void reconnect(boolean rebalance) {
783        synchronized (reconnectMutex) {
784            if (started) {
785                if (rebalance) {
786                    doRebalance = true;
787                }
788                LOG.debug("Waking up reconnect task");
789                try {
790                    reconnectTask.wakeup();
791                } catch (InterruptedException e) {
792                    Thread.currentThread().interrupt();
793                }
794            } else {
795                LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
796            }
797        }
798    }
799
800    private List<URI> getConnectList() {
801        if (!updated.isEmpty()) {
802            return updated;
803        }
804        ArrayList<URI> l = new ArrayList<URI>(uris);
805        boolean removed = false;
806        if (failedConnectTransportURI != null) {
807            removed = l.remove(failedConnectTransportURI);
808        }
809        if (randomize) {
810            // Randomly, reorder the list by random swapping
811            for (int i = 0; i < l.size(); i++) {
812                // meed parenthesis due other JDKs (see AMQ-4826)
813                int p = ((int) (Math.random() * 100)) % l.size();
814                URI t = l.get(p);
815                l.set(p, l.get(i));
816                l.set(i, t);
817            }
818        }
819        if (removed) {
820            l.add(failedConnectTransportURI);
821        }
822        if (LOG.isDebugEnabled()) {
823            LOG.debug("urlList connectionList:" + l + ", from: " + uris);
824        }
825        return l;
826    }
827
828    @Override
829    public TransportListener getTransportListener() {
830        return transportListener;
831    }
832
833    @Override
834    public void setTransportListener(TransportListener commandListener) {
835        synchronized (listenerMutex) {
836            this.transportListener = commandListener;
837            listenerMutex.notifyAll();
838        }
839    }
840
841    @Override
842    public <T> T narrow(Class<T> target) {
843
844        if (target.isAssignableFrom(getClass())) {
845            return target.cast(this);
846        }
847        Transport transport = connectedTransport.get();
848        if (transport != null) {
849            return transport.narrow(target);
850        }
851        return null;
852
853    }
854
855    protected void restoreTransport(Transport t) throws Exception, IOException {
856        t.start();
857        // send information to the broker - informing it we are an ft client
858        ConnectionControl cc = new ConnectionControl();
859        cc.setFaultTolerant(true);
860        t.oneway(cc);
861        stateTracker.restore(t);
862        Map<Integer, Command> tmpMap = null;
863        synchronized (requestMap) {
864            tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
865        }
866        for (Command command : tmpMap.values()) {
867            if (LOG.isTraceEnabled()) {
868                LOG.trace("restore requestMap, replay: " + command);
869            }
870            t.oneway(command);
871        }
872    }
873
874    public boolean isUseExponentialBackOff() {
875        return useExponentialBackOff;
876    }
877
878    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
879        this.useExponentialBackOff = useExponentialBackOff;
880    }
881
882    @Override
883    public String toString() {
884        return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
885    }
886
887    @Override
888    public String getRemoteAddress() {
889        Transport transport = connectedTransport.get();
890        if (transport != null) {
891            return transport.getRemoteAddress();
892        }
893        return null;
894    }
895
896    @Override
897    public boolean isFaultTolerant() {
898        return true;
899    }
900
901    private void doUpdateURIsFromDisk() {
902        // If updateURIsURL is specified, read the file and add any new
903        // transport URI's to this FailOverTransport.
904        // Note: Could track file timestamp to avoid unnecessary reading.
905        String fileURL = getUpdateURIsURL();
906        if (fileURL != null) {
907            BufferedReader in = null;
908            String newUris = null;
909            StringBuffer buffer = new StringBuffer();
910
911            try {
912                in = new BufferedReader(getURLStream(fileURL));
913                while (true) {
914                    String line = in.readLine();
915                    if (line == null) {
916                        break;
917                    }
918                    buffer.append(line);
919                }
920                newUris = buffer.toString();
921            } catch (IOException ioe) {
922                LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
923            } finally {
924                if (in != null) {
925                    try {
926                        in.close();
927                    } catch (IOException ioe) {
928                        // ignore
929                    }
930                }
931            }
932
933            processNewTransports(isRebalanceUpdateURIs(), newUris);
934        }
935    }
936
937    final boolean doReconnect() {
938        Exception failure = null;
939        synchronized (reconnectMutex) {
940
941            // First ensure we are up to date.
942            doUpdateURIsFromDisk();
943
944            if (disposed || connectionFailure != null) {
945                reconnectMutex.notifyAll();
946            }
947            if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
948                return false;
949            } else {
950                List<URI> connectList = getConnectList();
951                if (connectList.isEmpty()) {
952                    failure = new IOException("No uris available to connect to.");
953                } else {
954                    if (doRebalance) {
955                        if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) {
956                            // already connected to first in the list, no need to rebalance
957                            doRebalance = false;
958                            return false;
959                        } else {
960                            if (LOG.isDebugEnabled()) {
961                                LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
962                            }
963
964                            try {
965                                Transport transport = this.connectedTransport.getAndSet(null);
966                                if (transport != null) {
967                                    disposeTransport(transport);
968                                }
969                            } catch (Exception e) {
970                                if (LOG.isDebugEnabled()) {
971                                    LOG.debug("Caught an exception stopping existing transport for rebalance", e);
972                                }
973                            }
974                        }
975                        doRebalance = false;
976                    }
977
978                    resetReconnectDelay();
979
980                    Transport transport = null;
981                    URI uri = null;
982
983                    // If we have a backup already waiting lets try it.
984                    synchronized (backupMutex) {
985                        if ((priorityBackup || backup) && !backups.isEmpty()) {
986                            ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups);
987                            if (randomize) {
988                                Collections.shuffle(l);
989                            }
990                            BackupTransport bt = l.remove(0);
991                            backups.remove(bt);
992                            transport = bt.getTransport();
993                            uri = bt.getUri();
994                            myTransportListener.onCommand(bt.getBrokerInfo());
995                            if (priorityBackup && priorityBackupAvailable) {
996                                Transport old = this.connectedTransport.getAndSet(null);
997                                if (old != null) {
998                                    disposeTransport(old);
999                                }
1000                                priorityBackupAvailable = false;
1001                            }
1002                        }
1003                    }
1004
1005                    // Sleep for the reconnectDelay if there's no backup and we aren't trying
1006                    // for the first time, or we were disposed for some reason.
1007                    if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) {
1008                        synchronized (sleepMutex) {
1009                            if (LOG.isDebugEnabled()) {
1010                                LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
1011                            }
1012                            try {
1013                                sleepMutex.wait(reconnectDelay);
1014                            } catch (InterruptedException e) {
1015                                Thread.currentThread().interrupt();
1016                            }
1017                        }
1018                    }
1019
1020                    Iterator<URI> iter = connectList.iterator();
1021                    while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
1022
1023                        try {
1024                            SslContext.setCurrentSslContext(brokerSslContext);
1025
1026                            // We could be starting with a backup and if so we wait to grab a
1027                            // URI from the pool until next time around.
1028                            if (transport == null) {
1029                                uri = addExtraQueryOptions(iter.next());
1030                                transport = TransportFactory.compositeConnect(uri);
1031                            }
1032
1033                            if (LOG.isDebugEnabled()) {
1034                                LOG.debug("Attempting  " + connectFailures + "th  connect to: " + uri);
1035                            }
1036                            transport.setTransportListener(myTransportListener);
1037                            transport.start();
1038
1039                            if (started &&  !firstConnection) {
1040                                restoreTransport(transport);
1041                            }
1042
1043                            if (LOG.isDebugEnabled()) {
1044                                LOG.debug("Connection established");
1045                            }
1046                            reconnectDelay = initialReconnectDelay;
1047                            connectedTransportURI = uri;
1048                            connectedTransport.set(transport);
1049                            connectedToPriority = isPriority(connectedTransportURI);
1050                            reconnectMutex.notifyAll();
1051                            connectFailures = 0;
1052
1053                            // Make sure on initial startup, that the transportListener
1054                            // has been initialized for this instance.
1055                            synchronized (listenerMutex) {
1056                                if (transportListener == null) {
1057                                    try {
1058                                        // if it isn't set after 2secs - it probably never will be
1059                                        listenerMutex.wait(2000);
1060                                    } catch (InterruptedException ex) {
1061                                    }
1062                                }
1063                            }
1064
1065                            if (transportListener != null) {
1066                                transportListener.transportResumed();
1067                            } else {
1068                                if (LOG.isDebugEnabled()) {
1069                                    LOG.debug("transport resumed by transport listener not set");
1070                                }
1071                            }
1072
1073                            if (firstConnection) {
1074                                firstConnection = false;
1075                                LOG.info("Successfully connected to " + uri);
1076                            } else {
1077                                LOG.info("Successfully reconnected to " + uri);
1078                            }
1079
1080                            return false;
1081                        } catch (Exception e) {
1082                            failure = e;
1083                            if (LOG.isDebugEnabled()) {
1084                                LOG.debug("Connect fail to: " + uri + ", reason: " + e);
1085                            }
1086                            if (transport != null) {
1087                                try {
1088                                    transport.stop();
1089                                    transport = null;
1090                                } catch (Exception ee) {
1091                                    if (LOG.isDebugEnabled()) {
1092                                        LOG.debug("Stop of failed transport: " + transport +
1093                                                  " failed with reason: " + ee);
1094                                    }
1095                                }
1096                            }
1097                        } finally {
1098                            SslContext.setCurrentSslContext(null);
1099                        }
1100                    }
1101                }
1102            }
1103
1104            int reconnectLimit = calculateReconnectAttemptLimit();
1105
1106            connectFailures++;
1107            if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
1108                LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
1109                connectionFailure = failure;
1110
1111                // Make sure on initial startup, that the transportListener has been
1112                // initialized for this instance.
1113                synchronized (listenerMutex) {
1114                    if (transportListener == null) {
1115                        try {
1116                            listenerMutex.wait(2000);
1117                        } catch (InterruptedException ex) {
1118                        }
1119                    }
1120                }
1121
1122                propagateFailureToExceptionListener(connectionFailure);
1123                return false;
1124            }
1125
1126            int warnInterval = getWarnAfterReconnectAttempts();
1127            if (warnInterval > 0 && (connectFailures % warnInterval) == 0) {
1128                LOG.warn("Failed to connect to {} after: {} attempt(s) continuing to retry.",
1129                         uris, connectFailures);
1130            }
1131        }
1132
1133        if (!disposed) {
1134            doDelay();
1135        }
1136
1137        return !disposed;
1138    }
1139
1140    private void doDelay() {
1141        if (reconnectDelay > 0) {
1142            synchronized (sleepMutex) {
1143                if (LOG.isDebugEnabled()) {
1144                    LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
1145                }
1146                try {
1147                    sleepMutex.wait(reconnectDelay);
1148                } catch (InterruptedException e) {
1149                    Thread.currentThread().interrupt();
1150                }
1151            }
1152        }
1153
1154        if (useExponentialBackOff) {
1155            // Exponential increment of reconnect delay.
1156            reconnectDelay *= backOffMultiplier;
1157            if (reconnectDelay > maxReconnectDelay) {
1158                reconnectDelay = maxReconnectDelay;
1159            }
1160        }
1161    }
1162
1163    private void resetReconnectDelay() {
1164        if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
1165            reconnectDelay = initialReconnectDelay;
1166        }
1167    }
1168
1169    /*
1170      * called with reconnectMutex held
1171     */
1172    private void propagateFailureToExceptionListener(Exception exception) {
1173        if (transportListener != null) {
1174            if (exception instanceof IOException) {
1175                transportListener.onException((IOException)exception);
1176            } else {
1177                transportListener.onException(IOExceptionSupport.create(exception));
1178            }
1179        }
1180        reconnectMutex.notifyAll();
1181    }
1182
1183    private int calculateReconnectAttemptLimit() {
1184        int maxReconnectValue = this.maxReconnectAttempts;
1185        if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {
1186            maxReconnectValue = this.startupMaxReconnectAttempts;
1187        }
1188        return maxReconnectValue;
1189    }
1190
1191    private boolean shouldBuildBackups() {
1192       return (backup && backups.size() < backupPoolSize) || (priorityBackup && !(priorityBackupAvailable || connectedToPriority));
1193    }
1194
1195    final boolean buildBackups() {
1196        synchronized (backupMutex) {
1197            if (!disposed && shouldBuildBackups()) {
1198                ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
1199                List<URI> connectList = getConnectList();
1200                for (URI uri: connectList) {
1201                    if (!backupList.contains(uri)) {
1202                        backupList.add(uri);
1203                    }
1204                }
1205                // removed disposed backups
1206                List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
1207                for (BackupTransport bt : backups) {
1208                    if (bt.isDisposed()) {
1209                        disposedList.add(bt);
1210                    }
1211                }
1212                backups.removeAll(disposedList);
1213                disposedList.clear();
1214                for (Iterator<URI> iter = backupList.iterator(); !disposed && iter.hasNext() && shouldBuildBackups(); ) {
1215                    URI uri = addExtraQueryOptions(iter.next());
1216                    if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
1217                        try {
1218                            SslContext.setCurrentSslContext(brokerSslContext);
1219                            BackupTransport bt = new BackupTransport(this);
1220                            bt.setUri(uri);
1221                            if (!backups.contains(bt)) {
1222                                Transport t = TransportFactory.compositeConnect(uri);
1223                                t.setTransportListener(bt);
1224                                t.start();
1225                                bt.setTransport(t);
1226                                if (priorityBackup && isPriority(uri)) {
1227                                   priorityBackupAvailable = true;
1228                                   backups.add(0, bt);
1229                                   // if this priority backup overflows the pool
1230                                   // remove the backup with the lowest priority
1231                                   if (backups.size() > backupPoolSize) {
1232                                       BackupTransport disposeTransport = backups.remove(backups.size() - 1);
1233                                       disposeTransport.setDisposed(true);
1234                                       Transport transport = disposeTransport.getTransport();
1235                                       if (transport != null) {
1236                                           transport.setTransportListener(disposedListener);
1237                                           disposeTransport(transport);
1238                                       }
1239                                   }
1240                                } else {
1241                                    backups.add(bt);
1242                                }
1243                            }
1244                        } catch (Exception e) {
1245                            LOG.debug("Failed to build backup ", e);
1246                        } finally {
1247                            SslContext.setCurrentSslContext(null);
1248                        }
1249                    }
1250                }
1251            }
1252        }
1253        return false;
1254    }
1255
1256    protected boolean isPriority(URI uri) {
1257        if (!priorityBackup) {
1258            return false;
1259        }
1260
1261        if (!priorityList.isEmpty()) {
1262            return priorityList.contains(uri);
1263        }
1264        return uris.indexOf(uri) == 0;
1265    }
1266
1267    @Override
1268    public boolean isDisposed() {
1269        return disposed;
1270    }
1271
1272    @Override
1273    public boolean isConnected() {
1274        return connectedTransport.get() != null;
1275    }
1276
1277    @Override
1278    public void reconnect(URI uri) throws IOException {
1279        add(true, new URI[]{uri});
1280    }
1281
1282    @Override
1283    public boolean isReconnectSupported() {
1284        return this.reconnectSupported;
1285    }
1286
1287    public void setReconnectSupported(boolean value) {
1288        this.reconnectSupported = value;
1289    }
1290
1291    @Override
1292    public boolean isUpdateURIsSupported() {
1293        return this.updateURIsSupported;
1294    }
1295
1296    public void setUpdateURIsSupported(boolean value) {
1297        this.updateURIsSupported = value;
1298    }
1299
1300    @Override
1301    public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
1302        if (isUpdateURIsSupported()) {
1303            HashSet<URI> copy = new HashSet<URI>();
1304            synchronized (reconnectMutex) {
1305                copy.addAll(this.updated);
1306                updated.clear();
1307                if (updatedURIs != null && updatedURIs.length > 0) {
1308                    for (URI uri : updatedURIs) {
1309                        if (uri != null && !updated.contains(uri)) {
1310                            updated.add(uri);
1311                        }
1312                    }
1313                }
1314            }
1315            if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) {
1316                buildBackups();
1317                reconnect(rebalance);
1318            }
1319        }
1320    }
1321
1322    /**
1323     * @return the updateURIsURL
1324     */
1325    public String getUpdateURIsURL() {
1326        return this.updateURIsURL;
1327    }
1328
1329    /**
1330     * @param updateURIsURL the updateURIsURL to set
1331     */
1332    public void setUpdateURIsURL(String updateURIsURL) {
1333        this.updateURIsURL = updateURIsURL;
1334    }
1335
1336    /**
1337     * @return the rebalanceUpdateURIs
1338     */
1339    public boolean isRebalanceUpdateURIs() {
1340        return this.rebalanceUpdateURIs;
1341    }
1342
1343    /**
1344     * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
1345     */
1346    public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
1347        this.rebalanceUpdateURIs = rebalanceUpdateURIs;
1348    }
1349
1350    @Override
1351    public int getReceiveCounter() {
1352        Transport transport = connectedTransport.get();
1353        if (transport == null) {
1354            return 0;
1355        }
1356        return transport.getReceiveCounter();
1357    }
1358
1359    public int getConnectFailures() {
1360        return connectFailures;
1361    }
1362
1363    public void connectionInterruptProcessingComplete(ConnectionId connectionId) {
1364        synchronized (reconnectMutex) {
1365            stateTracker.connectionInterruptProcessingComplete(this, connectionId);
1366        }
1367    }
1368
1369    public ConnectionStateTracker getStateTracker() {
1370        return stateTracker;
1371    }
1372
1373    private boolean contains(URI newURI) {
1374        boolean result = false;
1375        for (URI uri : uris) {
1376            if (compareURIs(newURI, uri)) {
1377                result = true;
1378                break;
1379            }
1380        }
1381
1382        return result;
1383    }
1384
1385    private boolean compareURIs(final URI first, final URI second) {
1386
1387        boolean result = false;
1388        if (first == null || second == null) {
1389            return result;
1390        }
1391
1392        if (first.getPort() == second.getPort()) {
1393            InetAddress firstAddr = null;
1394            InetAddress secondAddr = null;
1395            try {
1396                firstAddr = InetAddress.getByName(first.getHost());
1397                secondAddr = InetAddress.getByName(second.getHost());
1398
1399                if (firstAddr.equals(secondAddr)) {
1400                    result = true;
1401                }
1402
1403            } catch(IOException e) {
1404
1405                if (firstAddr == null) {
1406                    LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e);
1407                } else {
1408                    LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e);
1409                }
1410
1411                if (first.getHost().equalsIgnoreCase(second.getHost())) {
1412                    result = true;
1413                }
1414            }
1415        }
1416
1417        return result;
1418    }
1419
1420    private InputStreamReader getURLStream(String path) throws IOException {
1421        InputStreamReader result = null;
1422        URL url = null;
1423        try {
1424            url = new URL(path);
1425            result = new InputStreamReader(url.openStream());
1426        } catch (MalformedURLException e) {
1427            // ignore - it could be a path to a a local file
1428        }
1429        if (result == null) {
1430            result = new FileReader(path);
1431        }
1432        return result;
1433    }
1434
1435    private URI addExtraQueryOptions(URI uri) {
1436        try {
1437            if( nestedExtraQueryOptions!=null && !nestedExtraQueryOptions.isEmpty() ) {
1438                if( uri.getQuery() == null ) {
1439                    uri = URISupport.createURIWithQuery(uri, nestedExtraQueryOptions);
1440                } else {
1441                    uri = URISupport.createURIWithQuery(uri, uri.getQuery()+"&"+nestedExtraQueryOptions);
1442                }
1443            }
1444        } catch (URISyntaxException e) {
1445            throw new RuntimeException(e);
1446        }
1447        return uri;
1448    }
1449
1450    public void setNestedExtraQueryOptions(String nestedExtraQueryOptions) {
1451        this.nestedExtraQueryOptions = nestedExtraQueryOptions;
1452    }
1453
1454    public int getWarnAfterReconnectAttempts() {
1455        return warnAfterReconnectAttempts;
1456    }
1457
1458    /**
1459     * Sets the number of Connect / Reconnect attempts that must occur before a warn message
1460     * is logged indicating that the transport is not connected.  This can be useful when the
1461     * client is running inside some container or service as it give an indication of some
1462     * problem with the client connection that might not otherwise be visible.  To disable the
1463     * log messages this value should be set to a value @{code attempts <= 0}
1464     *
1465     * @param warnAfterReconnectAttempts
1466     *      The number of failed connection attempts that must happen before a warning is logged.
1467     */
1468    public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) {
1469        this.warnAfterReconnectAttempts = warnAfterReconnectAttempts;
1470    }
1471
1472}