001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.fanout;
018
019import java.io.IOException;
020import java.io.InterruptedIOException;
021import java.net.URI;
022import java.security.cert.X509Certificate;
023import java.util.ArrayList;
024import java.util.Iterator;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.atomic.AtomicInteger;
028
029import org.apache.activemq.command.Command;
030import org.apache.activemq.command.ConsumerInfo;
031import org.apache.activemq.command.Message;
032import org.apache.activemq.command.RemoveInfo;
033import org.apache.activemq.command.Response;
034import org.apache.activemq.state.ConnectionStateTracker;
035import org.apache.activemq.thread.Task;
036import org.apache.activemq.thread.TaskRunner;
037import org.apache.activemq.thread.TaskRunnerFactory;
038import org.apache.activemq.transport.CompositeTransport;
039import org.apache.activemq.transport.DefaultTransportListener;
040import org.apache.activemq.transport.FutureResponse;
041import org.apache.activemq.transport.ResponseCallback;
042import org.apache.activemq.transport.Transport;
043import org.apache.activemq.transport.TransportFactory;
044import org.apache.activemq.transport.TransportListener;
045import org.apache.activemq.util.IOExceptionSupport;
046import org.apache.activemq.util.ServiceStopper;
047import org.apache.activemq.util.ServiceSupport;
048import org.apache.activemq.wireformat.WireFormat;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * A Transport that fans out a connection to multiple brokers.
054 */
055public class FanoutTransport implements CompositeTransport {
056
057    private static final Logger LOG = LoggerFactory.getLogger(FanoutTransport.class);
058
059    private TransportListener transportListener;
060    private boolean disposed;
061    private boolean connected;
062
063    private final Object reconnectMutex = new Object();
064    private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
065    private final ConcurrentMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>();
066
067    private final TaskRunnerFactory reconnectTaskFactory;
068    private final TaskRunner reconnectTask;
069    private boolean started;
070
071    private final ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>();
072    private int connectedCount;
073
074    private int minAckCount = 2;
075
076    private long initialReconnectDelay = 10;
077    private long maxReconnectDelay = 1000 * 30;
078    private long backOffMultiplier = 2;
079    private final boolean useExponentialBackOff = true;
080    private int maxReconnectAttempts;
081    private Exception connectionFailure;
082    private FanoutTransportHandler primary;
083    private boolean fanOutQueues = false;
084
085    static class RequestCounter {
086
087        final Command command;
088        final AtomicInteger ackCount;
089
090        RequestCounter(Command command, int count) {
091            this.command = command;
092            this.ackCount = new AtomicInteger(count);
093        }
094
095        @Override
096        public String toString() {
097            return command.getCommandId() + "=" + ackCount.get();
098        }
099    }
100
101    class FanoutTransportHandler extends DefaultTransportListener {
102
103        private final URI uri;
104        private Transport transport;
105
106        private int connectFailures;
107        private long reconnectDelay = initialReconnectDelay;
108        private long reconnectDate;
109
110        public FanoutTransportHandler(URI uri) {
111            this.uri = uri;
112        }
113
114        @Override
115        public void onCommand(Object o) {
116            Command command = (Command) o;
117            if (command.isResponse()) {
118                Integer id = new Integer(((Response) command).getCorrelationId());
119                RequestCounter rc = requestMap.get(id);
120                if (rc != null) {
121                    if (rc.ackCount.decrementAndGet() <= 0) {
122                        requestMap.remove(id);
123                        transportListenerOnCommand(command);
124                    }
125                } else {
126                    transportListenerOnCommand(command);
127                }
128            } else {
129                transportListenerOnCommand(command);
130            }
131        }
132
133        @Override
134        public void onException(IOException error) {
135            try {
136                synchronized (reconnectMutex) {
137                    if (transport == null || !transport.isConnected()) {
138                        return;
139                    }
140
141                    LOG.debug("Transport failed, starting up reconnect task", error);
142
143                    ServiceSupport.dispose(transport);
144                    transport = null;
145                    connectedCount--;
146                    if (primary == this) {
147                        primary = null;
148                    }
149                    reconnectTask.wakeup();
150                }
151            } catch (InterruptedException e) {
152                Thread.currentThread().interrupt();
153                if (transportListener != null) {
154                    transportListener.onException(new InterruptedIOException());
155                }
156            }
157        }
158    }
159
160    public FanoutTransport() {
161        // Setup a task that is used to reconnect the a connection async.
162        reconnectTaskFactory = new TaskRunnerFactory();
163        reconnectTaskFactory.init();
164        reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
165            @Override
166            public boolean iterate() {
167                return doConnect();
168            }
169        }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this));
170    }
171
172    /**
173     * @return
174     */
175    private boolean doConnect() {
176        long closestReconnectDate = 0;
177        synchronized (reconnectMutex) {
178
179            if (disposed || connectionFailure != null) {
180                reconnectMutex.notifyAll();
181            }
182
183            if (transports.size() == connectedCount || disposed || connectionFailure != null) {
184                return false;
185            } else {
186
187                if (transports.isEmpty()) {
188                    // connectionFailure = new IOException("No uris available to
189                    // connect to.");
190                } else {
191
192                    // Try to connect them up.
193                    Iterator<FanoutTransportHandler> iter = transports.iterator();
194                    while (iter.hasNext() && !disposed) {
195
196                        long now = System.currentTimeMillis();
197
198                        FanoutTransportHandler fanoutHandler = iter.next();
199                        if (fanoutHandler.transport != null) {
200                            continue;
201                        }
202
203                        // Are we waiting a little to try to reconnect this one?
204                        if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) {
205                            if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
206                                closestReconnectDate = fanoutHandler.reconnectDate;
207                            }
208                            continue;
209                        }
210
211                        URI uri = fanoutHandler.uri;
212                        try {
213                            LOG.debug("Stopped: " + this);
214                            LOG.debug("Attempting connect to: " + uri);
215                            Transport t = TransportFactory.compositeConnect(uri);
216                            fanoutHandler.transport = t;
217                            t.setTransportListener(fanoutHandler);
218                            if (started) {
219                                restoreTransport(fanoutHandler);
220                            }
221                            LOG.debug("Connection established");
222                            fanoutHandler.reconnectDelay = initialReconnectDelay;
223                            fanoutHandler.connectFailures = 0;
224                            if (primary == null) {
225                                primary = fanoutHandler;
226                            }
227                            connectedCount++;
228                        } catch (Exception e) {
229                            LOG.debug("Connect fail to: " + uri + ", reason: " + e);
230
231                            if (fanoutHandler.transport != null) {
232                                ServiceSupport.dispose(fanoutHandler.transport);
233                                fanoutHandler.transport = null;
234                            }
235
236                            if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
237                                LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
238                                connectionFailure = e;
239                                reconnectMutex.notifyAll();
240                                return false;
241                            } else {
242
243                                if (useExponentialBackOff) {
244                                    // Exponential increment of reconnect delay.
245                                    fanoutHandler.reconnectDelay *= backOffMultiplier;
246                                    if (fanoutHandler.reconnectDelay > maxReconnectDelay) {
247                                        fanoutHandler.reconnectDelay = maxReconnectDelay;
248                                    }
249                                }
250
251                                fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay;
252
253                                if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
254                                    closestReconnectDate = fanoutHandler.reconnectDate;
255                                }
256                            }
257                        }
258                    }
259
260                    if (transports.size() == connectedCount || disposed) {
261                        reconnectMutex.notifyAll();
262                        return false;
263                    }
264                }
265            }
266        }
267
268        try {
269            long reconnectDelay = closestReconnectDate - System.currentTimeMillis();
270            if (reconnectDelay > 0) {
271                LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
272                Thread.sleep(reconnectDelay);
273            }
274        } catch (InterruptedException e1) {
275            Thread.currentThread().interrupt();
276        }
277        return true;
278    }
279
280    @Override
281    public void start() throws Exception {
282        synchronized (reconnectMutex) {
283            LOG.debug("Started.");
284            if (started) {
285                return;
286            }
287            started = true;
288            for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
289                FanoutTransportHandler th = iter.next();
290                if (th.transport != null) {
291                    restoreTransport(th);
292                }
293            }
294            connected = true;
295        }
296    }
297
298    @Override
299    public void stop() throws Exception {
300        try {
301            synchronized (reconnectMutex) {
302                ServiceStopper ss = new ServiceStopper();
303
304                if (!started) {
305                    return;
306                }
307                started = false;
308                disposed = true;
309                connected = false;
310
311                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
312                    FanoutTransportHandler th = iter.next();
313                    if (th.transport != null) {
314                        ss.stop(th.transport);
315                    }
316                }
317
318                LOG.debug("Stopped: " + this);
319                ss.throwFirstException();
320            }
321        } finally {
322            reconnectTask.shutdown();
323            reconnectTaskFactory.shutdownNow();
324        }
325    }
326
327    public int getMinAckCount() {
328        return minAckCount;
329    }
330
331    public void setMinAckCount(int minAckCount) {
332        this.minAckCount = minAckCount;
333    }
334
335    public long getInitialReconnectDelay() {
336        return initialReconnectDelay;
337    }
338
339    public void setInitialReconnectDelay(long initialReconnectDelay) {
340        this.initialReconnectDelay = initialReconnectDelay;
341    }
342
343    public long getMaxReconnectDelay() {
344        return maxReconnectDelay;
345    }
346
347    public void setMaxReconnectDelay(long maxReconnectDelay) {
348        this.maxReconnectDelay = maxReconnectDelay;
349    }
350
351    public long getReconnectDelayExponent() {
352        return backOffMultiplier;
353    }
354
355    public void setReconnectDelayExponent(long reconnectDelayExponent) {
356        this.backOffMultiplier = reconnectDelayExponent;
357    }
358
359    public int getMaxReconnectAttempts() {
360        return maxReconnectAttempts;
361    }
362
363    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
364        this.maxReconnectAttempts = maxReconnectAttempts;
365    }
366
367    @Override
368    public void oneway(Object o) throws IOException {
369        final Command command = (Command) o;
370        try {
371            synchronized (reconnectMutex) {
372
373                // Wait for transport to be connected.
374                while (connectedCount < minAckCount && !disposed && connectionFailure == null) {
375                    LOG.debug("Waiting for at least " + minAckCount + " transports to be connected.");
376                    reconnectMutex.wait(1000);
377                }
378
379                // Still not fully connected.
380                if (connectedCount < minAckCount) {
381
382                    Exception error;
383
384                    // Throw the right kind of error..
385                    if (disposed) {
386                        error = new IOException("Transport disposed.");
387                    } else if (connectionFailure != null) {
388                        error = connectionFailure;
389                    } else {
390                        error = new IOException("Unexpected failure.");
391                    }
392
393                    if (error instanceof IOException) {
394                        throw (IOException) error;
395                    }
396                    throw IOExceptionSupport.create(error);
397                }
398
399                // If it was a request and it was not being tracked by
400                // the state tracker,
401                // then hold it in the requestMap so that we can replay
402                // it later.
403                boolean fanout = isFanoutCommand(command);
404                if (stateTracker.track(command) == null && command.isResponseRequired()) {
405                    int size = fanout ? minAckCount : 1;
406                    requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
407                }
408
409                // Send the message.
410                if (fanout) {
411                    for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
412                        FanoutTransportHandler th = iter.next();
413                        if (th.transport != null) {
414                            try {
415                                th.transport.oneway(command);
416                            } catch (IOException e) {
417                                LOG.debug("Send attempt: failed.");
418                                th.onException(e);
419                            }
420                        }
421                    }
422                } else {
423                    try {
424                        primary.transport.oneway(command);
425                    } catch (IOException e) {
426                        LOG.debug("Send attempt: failed.");
427                        primary.onException(e);
428                    }
429                }
430            }
431        } catch (InterruptedException e) {
432            // Some one may be trying to stop our thread.
433            Thread.currentThread().interrupt();
434            throw new InterruptedIOException();
435        }
436    }
437
438    /**
439     * @param command
440     * @return
441     */
442    private boolean isFanoutCommand(Command command) {
443        if (command.isMessage()) {
444            if (fanOutQueues) {
445                return true;
446            }
447            return ((Message) command).getDestination().isTopic();
448        }
449        if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE || command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
450            return false;
451        }
452        return true;
453    }
454
455    @Override
456    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
457        throw new AssertionError("Unsupported Method");
458    }
459
460    @Override
461    public Object request(Object command) throws IOException {
462        throw new AssertionError("Unsupported Method");
463    }
464
465    @Override
466    public Object request(Object command, int timeout) throws IOException {
467        throw new AssertionError("Unsupported Method");
468    }
469
470    public void reconnect() {
471        LOG.debug("Waking up reconnect task");
472        try {
473            reconnectTask.wakeup();
474        } catch (InterruptedException e) {
475            Thread.currentThread().interrupt();
476        }
477    }
478
479    @Override
480    public TransportListener getTransportListener() {
481        return transportListener;
482    }
483
484    @Override
485    public void setTransportListener(TransportListener commandListener) {
486        this.transportListener = commandListener;
487    }
488
489    @Override
490    public <T> T narrow(Class<T> target) {
491        if (target.isAssignableFrom(getClass())) {
492            return target.cast(this);
493        }
494
495        synchronized (reconnectMutex) {
496            for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
497                FanoutTransportHandler th = iter.next();
498                if (th.transport != null) {
499                    T rc = th.transport.narrow(target);
500                    if (rc != null) {
501                        return rc;
502                    }
503                }
504            }
505        }
506
507        return null;
508    }
509
510    protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException {
511        th.transport.start();
512        stateTracker.setRestoreConsumers(th.transport == primary);
513        stateTracker.restore(th.transport);
514        for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) {
515            RequestCounter rc = iter2.next();
516            th.transport.oneway(rc.command);
517        }
518    }
519
520    @Override
521    public void add(boolean reblance, URI uris[]) {
522        synchronized (reconnectMutex) {
523            for (int i = 0; i < uris.length; i++) {
524                URI uri = uris[i];
525
526                boolean match = false;
527                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
528                    FanoutTransportHandler th = iter.next();
529                    if (th.uri.equals(uri)) {
530                        match = true;
531                        break;
532                    }
533                }
534
535                if (!match) {
536                    FanoutTransportHandler th = new FanoutTransportHandler(uri);
537                    transports.add(th);
538                    reconnect();
539                }
540            }
541        }
542    }
543
544    @Override
545    public void remove(boolean rebalance, URI uris[]) {
546        synchronized (reconnectMutex) {
547            for (int i = 0; i < uris.length; i++) {
548                URI uri = uris[i];
549
550                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
551                    FanoutTransportHandler th = iter.next();
552                    if (th.uri.equals(uri)) {
553                        if (th.transport != null) {
554                            ServiceSupport.dispose(th.transport);
555                            connectedCount--;
556                        }
557                        iter.remove();
558                        break;
559                    }
560                }
561            }
562        }
563    }
564
565    @Override
566    public void reconnect(URI uri) throws IOException {
567        add(true, new URI[] { uri });
568    }
569
570    @Override
571    public boolean isReconnectSupported() {
572        return true;
573    }
574
575    @Override
576    public boolean isUpdateURIsSupported() {
577        return true;
578    }
579
580    @Override
581    public void updateURIs(boolean reblance, URI[] uris) throws IOException {
582        add(reblance, uris);
583    }
584
585    @Override
586    public String getRemoteAddress() {
587        if (primary != null) {
588            if (primary.transport != null) {
589                return primary.transport.getRemoteAddress();
590            }
591        }
592        return null;
593    }
594
595    protected void transportListenerOnCommand(Command command) {
596        if (transportListener != null) {
597            transportListener.onCommand(command);
598        }
599    }
600
601    @Override
602    public boolean isFaultTolerant() {
603        return true;
604    }
605
606    public boolean isFanOutQueues() {
607        return fanOutQueues;
608    }
609
610    public void setFanOutQueues(boolean fanOutQueues) {
611        this.fanOutQueues = fanOutQueues;
612    }
613
614    @Override
615    public boolean isDisposed() {
616        return disposed;
617    }
618
619    @Override
620    public boolean isConnected() {
621        return connected;
622    }
623
624    @Override
625    public int getReceiveCounter() {
626        int rc = 0;
627        synchronized (reconnectMutex) {
628            for (FanoutTransportHandler th : transports) {
629                if (th.transport != null) {
630                    rc += th.transport.getReceiveCounter();
631                }
632            }
633        }
634        return rc;
635    }
636
637    @Override
638    public X509Certificate[] getPeerCertificates() {
639        return null;
640    }
641
642    @Override
643    public void setPeerCertificates(X509Certificate[] certificates) {
644
645    }
646
647    @Override
648    public WireFormat getWireFormat() {
649        return null;
650    }
651}