001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.fanout;
018    
019    import java.io.IOException;
020    import java.io.InterruptedIOException;
021    import java.net.URI;
022    import java.util.ArrayList;
023    import java.util.Iterator;
024    import java.util.concurrent.ConcurrentHashMap;
025    import java.util.concurrent.atomic.AtomicInteger;
026    
027    import org.apache.activemq.command.Command;
028    import org.apache.activemq.command.ConsumerInfo;
029    import org.apache.activemq.command.Message;
030    import org.apache.activemq.command.RemoveInfo;
031    import org.apache.activemq.command.Response;
032    import org.apache.activemq.state.ConnectionStateTracker;
033    import org.apache.activemq.thread.Task;
034    import org.apache.activemq.thread.TaskRunner;
035    import org.apache.activemq.thread.TaskRunnerFactory;
036    import org.apache.activemq.transport.CompositeTransport;
037    import org.apache.activemq.transport.DefaultTransportListener;
038    import org.apache.activemq.transport.FutureResponse;
039    import org.apache.activemq.transport.ResponseCallback;
040    import org.apache.activemq.transport.Transport;
041    import org.apache.activemq.transport.TransportFactory;
042    import org.apache.activemq.transport.TransportListener;
043    import org.apache.activemq.util.IOExceptionSupport;
044    import org.apache.activemq.util.ServiceStopper;
045    import org.apache.activemq.util.ServiceSupport;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * A Transport that fans out a connection to multiple brokers.
051     * 
052     * 
053     */
054    public class FanoutTransport implements CompositeTransport {
055    
056        private static final Logger LOG = LoggerFactory.getLogger(FanoutTransport.class);
057    
058        private TransportListener transportListener;
059        private boolean disposed;
060        private boolean connected;
061    
062        private final Object reconnectMutex = new Object();
063        private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
064        private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>();
065    
066        private final TaskRunnerFactory reconnectTaskFactory;
067        private final TaskRunner reconnectTask;
068        private boolean started;
069    
070        private final ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>();
071        private int connectedCount;
072    
073        private int minAckCount = 2;
074    
075        private long initialReconnectDelay = 10;
076        private long maxReconnectDelay = 1000 * 30;
077        private long backOffMultiplier = 2;
078        private final boolean useExponentialBackOff = true;
079        private int maxReconnectAttempts;
080        private Exception connectionFailure;
081        private FanoutTransportHandler primary;
082        private boolean fanOutQueues = false;
083    
084        static class RequestCounter {
085    
086            final Command command;
087            final AtomicInteger ackCount;
088    
089            RequestCounter(Command command, int count) {
090                this.command = command;
091                this.ackCount = new AtomicInteger(count);
092            }
093    
094            @Override
095            public String toString() {
096                return command.getCommandId() + "=" + ackCount.get();
097            }
098        }
099    
100        class FanoutTransportHandler extends DefaultTransportListener {
101    
102            private final URI uri;
103            private Transport transport;
104    
105            private int connectFailures;
106            private long reconnectDelay = initialReconnectDelay;
107            private long reconnectDate;
108    
109            public FanoutTransportHandler(URI uri) {
110                this.uri = uri;
111            }
112    
113            @Override
114            public void onCommand(Object o) {
115                Command command = (Command)o;
116                if (command.isResponse()) {
117                    Integer id = new Integer(((Response)command).getCorrelationId());
118                    RequestCounter rc = requestMap.get(id);
119                    if (rc != null) {
120                        if (rc.ackCount.decrementAndGet() <= 0) {
121                            requestMap.remove(id);
122                            transportListenerOnCommand(command);
123                        }
124                    } else {
125                        transportListenerOnCommand(command);
126                    }
127                } else {
128                    transportListenerOnCommand(command);
129                }
130            }
131    
132            @Override
133            public void onException(IOException error) {
134                try {
135                    synchronized (reconnectMutex) {
136                        if (transport == null || !transport.isConnected()) {
137                            return;
138                        }
139    
140                        LOG.debug("Transport failed, starting up reconnect task", error);
141    
142                        ServiceSupport.dispose(transport);
143                        transport = null;
144                        connectedCount--;
145                        if (primary == this) {
146                            primary = null;
147                        }
148                        reconnectTask.wakeup();
149                    }
150                } catch (InterruptedException e) {
151                    Thread.currentThread().interrupt();
152                    if (transportListener != null) {
153                        transportListener.onException(new InterruptedIOException());
154                    }
155                }
156            }
157        }
158    
159        public FanoutTransport() throws InterruptedIOException {
160            // Setup a task that is used to reconnect the a connection async.
161            reconnectTaskFactory = new TaskRunnerFactory();
162            reconnectTaskFactory.init();
163            reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
164                public boolean iterate() {
165                    return doConnect();
166                }
167            }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this));
168        }
169    
170        /**
171         * @return
172         */
173        private boolean doConnect() {
174            long closestReconnectDate = 0;
175            synchronized (reconnectMutex) {
176    
177                if (disposed || connectionFailure != null) {
178                    reconnectMutex.notifyAll();
179                }
180    
181                if (transports.size() == connectedCount || disposed || connectionFailure != null) {
182                    return false;
183                } else {
184    
185                    if (transports.isEmpty()) {
186                        // connectionFailure = new IOException("No uris available to
187                        // connect to.");
188                    } else {
189    
190                        // Try to connect them up.
191                        Iterator<FanoutTransportHandler> iter = transports.iterator();
192                        for (int i = 0; iter.hasNext() && !disposed; i++) {
193    
194                            long now = System.currentTimeMillis();
195    
196                            FanoutTransportHandler fanoutHandler = iter.next();
197                            if (fanoutHandler.transport != null) {
198                                continue;
199                            }
200    
201                            // Are we waiting a little to try to reconnect this one?
202                            if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) {
203                                if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
204                                    closestReconnectDate = fanoutHandler.reconnectDate;
205                                }
206                                continue;
207                            }
208    
209                            URI uri = fanoutHandler.uri;
210                            try {
211                                LOG.debug("Stopped: " + this);
212                                LOG.debug("Attempting connect to: " + uri);
213                                Transport t = TransportFactory.compositeConnect(uri);
214                                fanoutHandler.transport = t;
215                                t.setTransportListener(fanoutHandler);
216                                if (started) {
217                                    restoreTransport(fanoutHandler);
218                                }
219                                LOG.debug("Connection established");
220                                fanoutHandler.reconnectDelay = initialReconnectDelay;
221                                fanoutHandler.connectFailures = 0;
222                                if (primary == null) {
223                                    primary = fanoutHandler;
224                                }
225                                connectedCount++;
226                            } catch (Exception e) {
227                                LOG.debug("Connect fail to: " + uri + ", reason: " + e);
228    
229                                if( fanoutHandler.transport !=null ) {
230                                    ServiceSupport.dispose(fanoutHandler.transport);
231                                    fanoutHandler.transport=null;
232                                }
233                                
234                                if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
235                                    LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
236                                    connectionFailure = e;
237                                    reconnectMutex.notifyAll();
238                                    return false;
239                                } else {
240    
241                                    if (useExponentialBackOff) {
242                                        // Exponential increment of reconnect delay.
243                                        fanoutHandler.reconnectDelay *= backOffMultiplier;
244                                        if (fanoutHandler.reconnectDelay > maxReconnectDelay) {
245                                            fanoutHandler.reconnectDelay = maxReconnectDelay;
246                                        }
247                                    }
248    
249                                    fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay;
250    
251                                    if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
252                                        closestReconnectDate = fanoutHandler.reconnectDate;
253                                    }
254                                }
255                            }
256                        }
257                        if (transports.size() == connectedCount || disposed) {
258                            reconnectMutex.notifyAll();
259                            return false;
260                        }
261    
262                    }
263                }
264    
265            }
266    
267            try {
268                long reconnectDelay = closestReconnectDate - System.currentTimeMillis();
269                if (reconnectDelay > 0) {
270                    LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
271                    Thread.sleep(reconnectDelay);
272                }
273            } catch (InterruptedException e1) {
274                Thread.currentThread().interrupt();
275            }
276            return true;
277        }
278    
279        public void start() throws Exception {
280            synchronized (reconnectMutex) {
281                LOG.debug("Started.");
282                if (started) {
283                    return;
284                }
285                started = true;
286                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
287                    FanoutTransportHandler th = iter.next();
288                    if (th.transport != null) {
289                        restoreTransport(th);
290                    }
291                }
292                connected=true;
293            }
294        }
295    
296        public void stop() throws Exception {
297            try {
298                synchronized (reconnectMutex) {
299                    ServiceStopper ss = new ServiceStopper();
300    
301                    if (!started) {
302                        return;
303                    }
304                    started = false;
305                    disposed = true;
306                    connected=false;
307    
308                    for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
309                        FanoutTransportHandler th = iter.next();
310                        if (th.transport != null) {
311                            ss.stop(th.transport);
312                        }
313                    }
314    
315                    LOG.debug("Stopped: " + this);
316                    ss.throwFirstException();
317                }
318            } finally {
319                reconnectTask.shutdown();
320                reconnectTaskFactory.shutdownNow();
321            }
322        }
323    
324            public int getMinAckCount() {
325                    return minAckCount;
326            }
327    
328            public void setMinAckCount(int minAckCount) {
329                    this.minAckCount = minAckCount;
330            }    
331        
332        public long getInitialReconnectDelay() {
333            return initialReconnectDelay;
334        }
335    
336        public void setInitialReconnectDelay(long initialReconnectDelay) {
337            this.initialReconnectDelay = initialReconnectDelay;
338        }
339    
340        public long getMaxReconnectDelay() {
341            return maxReconnectDelay;
342        }
343    
344        public void setMaxReconnectDelay(long maxReconnectDelay) {
345            this.maxReconnectDelay = maxReconnectDelay;
346        }
347    
348        public long getReconnectDelayExponent() {
349            return backOffMultiplier;
350        }
351    
352        public void setReconnectDelayExponent(long reconnectDelayExponent) {
353            this.backOffMultiplier = reconnectDelayExponent;
354        }
355    
356        public int getMaxReconnectAttempts() {
357            return maxReconnectAttempts;
358        }
359    
360        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
361            this.maxReconnectAttempts = maxReconnectAttempts;
362        }
363    
364        public void oneway(Object o) throws IOException {
365            final Command command = (Command)o;
366            try {
367                synchronized (reconnectMutex) {
368    
369                    // Wait for transport to be connected.
370                    while (connectedCount < minAckCount && !disposed && connectionFailure == null) {
371                        LOG.debug("Waiting for at least " + minAckCount + " transports to be connected.");
372                        reconnectMutex.wait(1000);
373                    }
374    
375                    // Still not fully connected.
376                    if (connectedCount < minAckCount) {
377    
378                        Exception error;
379    
380                        // Throw the right kind of error..
381                        if (disposed) {
382                            error = new IOException("Transport disposed.");
383                        } else if (connectionFailure != null) {
384                            error = connectionFailure;
385                        } else {
386                            error = new IOException("Unexpected failure.");
387                        }
388    
389                        if (error instanceof IOException) {
390                            throw (IOException)error;
391                        }
392                        throw IOExceptionSupport.create(error);
393                    }
394    
395                    // If it was a request and it was not being tracked by
396                    // the state tracker,
397                    // then hold it in the requestMap so that we can replay
398                    // it later.
399                    boolean fanout = isFanoutCommand(command);
400                    if (stateTracker.track(command) == null && command.isResponseRequired()) {
401                        int size = fanout ? minAckCount : 1;
402                        requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
403                    }
404                    
405                    // Send the message.
406                    if (fanout) {
407                        for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
408                            FanoutTransportHandler th = iter.next();
409                            if (th.transport != null) {
410                                try {
411                                    th.transport.oneway(command);
412                                } catch (IOException e) {
413                                    LOG.debug("Send attempt: failed.");
414                                    th.onException(e);
415                                }
416                            }
417                        }
418                    } else {
419                        try {
420                            primary.transport.oneway(command);
421                        } catch (IOException e) {
422                            LOG.debug("Send attempt: failed.");
423                            primary.onException(e);
424                        }
425                    }
426    
427                }
428            } catch (InterruptedException e) {
429                // Some one may be trying to stop our thread.
430                Thread.currentThread().interrupt();
431                throw new InterruptedIOException();
432            }
433        }
434    
435        /**
436         * @param command
437         * @return
438         */
439        private boolean isFanoutCommand(Command command) {
440            if (command.isMessage()) {
441                if( fanOutQueues ) {
442                    return true;
443                }
444                return ((Message)command).getDestination().isTopic();
445            }
446            if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ||
447                    command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
448                return false;
449            }
450            return true;
451        }
452    
453        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
454            throw new AssertionError("Unsupported Method");
455        }
456    
457        public Object request(Object command) throws IOException {
458            throw new AssertionError("Unsupported Method");
459        }
460    
461        public Object request(Object command, int timeout) throws IOException {
462            throw new AssertionError("Unsupported Method");
463        }
464    
465        public void reconnect() {
466            LOG.debug("Waking up reconnect task");
467            try {
468                reconnectTask.wakeup();
469            } catch (InterruptedException e) {
470                Thread.currentThread().interrupt();
471            }
472        }
473    
474        public TransportListener getTransportListener() {
475            return transportListener;
476        }
477    
478        public void setTransportListener(TransportListener commandListener) {
479            this.transportListener = commandListener;
480        }
481    
482        public <T> T narrow(Class<T> target) {
483    
484            if (target.isAssignableFrom(getClass())) {
485                return target.cast(this);
486            }
487    
488            synchronized (reconnectMutex) {
489                for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
490                    FanoutTransportHandler th = iter.next();
491                    if (th.transport != null) {
492                        T rc = th.transport.narrow(target);
493                        if (rc != null) {
494                            return rc;
495                        }
496                    }
497                }
498            }
499    
500            return null;
501    
502        }
503    
504        protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException {
505            th.transport.start();
506            stateTracker.setRestoreConsumers(th.transport == primary);
507            stateTracker.restore(th.transport);
508            for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) {
509                RequestCounter rc = iter2.next();
510                th.transport.oneway(rc.command);
511            }
512        }
513    
514        public void add(boolean reblance,URI uris[]) {
515    
516            synchronized (reconnectMutex) {
517                for (int i = 0; i < uris.length; i++) {
518                    URI uri = uris[i];
519    
520                    boolean match = false;
521                    for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
522                        FanoutTransportHandler th = iter.next();
523                        if (th.uri.equals(uri)) {
524                            match = true;
525                            break;
526                        }
527                    }
528                    if (!match) {
529                        FanoutTransportHandler th = new FanoutTransportHandler(uri);
530                        transports.add(th);
531                        reconnect();
532                    }
533                }
534            }
535    
536        }
537    
538        public void remove(boolean rebalance,URI uris[]) {
539    
540            synchronized (reconnectMutex) {
541                for (int i = 0; i < uris.length; i++) {
542                    URI uri = uris[i];
543    
544                    for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
545                        FanoutTransportHandler th = iter.next();
546                        if (th.uri.equals(uri)) {
547                            if (th.transport != null) {
548                                ServiceSupport.dispose(th.transport);
549                                connectedCount--;
550                            }
551                            iter.remove();
552                            break;
553                        }
554                    }
555                }
556            }
557    
558        }
559        
560        public void reconnect(URI uri) throws IOException {
561                    add(true,new URI[]{uri});
562                    
563            }
564        
565        public boolean isReconnectSupported() {
566            return true;
567        }
568    
569        public boolean isUpdateURIsSupported() {
570            return true;
571        }
572        public void updateURIs(boolean reblance,URI[] uris) throws IOException {
573            add(reblance,uris);
574        }
575    
576    
577        public String getRemoteAddress() {
578            if (primary != null) {
579                if (primary.transport != null) {
580                    return primary.transport.getRemoteAddress();
581                }
582            }
583            return null;
584        }
585    
586        protected void transportListenerOnCommand(Command command) {
587            if (transportListener != null) {
588                transportListener.onCommand(command);
589            }
590        }
591    
592        public boolean isFaultTolerant() {
593            return true;
594        }
595    
596        public boolean isFanOutQueues() {
597            return fanOutQueues;
598        }
599    
600        public void setFanOutQueues(boolean fanOutQueues) {
601            this.fanOutQueues = fanOutQueues;
602        }
603    
604            public boolean isDisposed() {
605                    return disposed;
606            }
607            
608    
609        public boolean isConnected() {
610            return connected;
611        }
612    
613        public int getReceiveCounter() {
614            int rc = 0;
615            synchronized (reconnectMutex) {
616                for (FanoutTransportHandler th : transports) {
617                    if (th.transport != null) {
618                        rc += th.transport.getReceiveCounter();
619                    }
620                }
621            }
622            return rc;
623        }
624    }