001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.fanout;
018
019 import java.io.IOException;
020 import java.io.InterruptedIOException;
021 import java.net.URI;
022 import java.util.ArrayList;
023 import java.util.Iterator;
024 import java.util.concurrent.ConcurrentHashMap;
025 import java.util.concurrent.atomic.AtomicInteger;
026
027 import org.apache.activemq.command.Command;
028 import org.apache.activemq.command.ConsumerInfo;
029 import org.apache.activemq.command.Message;
030 import org.apache.activemq.command.RemoveInfo;
031 import org.apache.activemq.command.Response;
032 import org.apache.activemq.state.ConnectionStateTracker;
033 import org.apache.activemq.thread.Task;
034 import org.apache.activemq.thread.TaskRunner;
035 import org.apache.activemq.thread.TaskRunnerFactory;
036 import org.apache.activemq.transport.CompositeTransport;
037 import org.apache.activemq.transport.DefaultTransportListener;
038 import org.apache.activemq.transport.FutureResponse;
039 import org.apache.activemq.transport.ResponseCallback;
040 import org.apache.activemq.transport.Transport;
041 import org.apache.activemq.transport.TransportFactory;
042 import org.apache.activemq.transport.TransportListener;
043 import org.apache.activemq.util.IOExceptionSupport;
044 import org.apache.activemq.util.ServiceStopper;
045 import org.apache.activemq.util.ServiceSupport;
046 import org.slf4j.Logger;
047 import org.slf4j.LoggerFactory;
048
049 /**
050 * A Transport that fans out a connection to multiple brokers.
051 *
052 *
053 */
054 public class FanoutTransport implements CompositeTransport {
055
056 private static final Logger LOG = LoggerFactory.getLogger(FanoutTransport.class);
057
058 private TransportListener transportListener;
059 private boolean disposed;
060 private boolean connected;
061
062 private final Object reconnectMutex = new Object();
063 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
064 private final ConcurrentHashMap<Integer, RequestCounter> requestMap = new ConcurrentHashMap<Integer, RequestCounter>();
065
066 private final TaskRunnerFactory reconnectTaskFactory;
067 private final TaskRunner reconnectTask;
068 private boolean started;
069
070 private final ArrayList<FanoutTransportHandler> transports = new ArrayList<FanoutTransportHandler>();
071 private int connectedCount;
072
073 private int minAckCount = 2;
074
075 private long initialReconnectDelay = 10;
076 private long maxReconnectDelay = 1000 * 30;
077 private long backOffMultiplier = 2;
078 private final boolean useExponentialBackOff = true;
079 private int maxReconnectAttempts;
080 private Exception connectionFailure;
081 private FanoutTransportHandler primary;
082 private boolean fanOutQueues = false;
083
084 static class RequestCounter {
085
086 final Command command;
087 final AtomicInteger ackCount;
088
089 RequestCounter(Command command, int count) {
090 this.command = command;
091 this.ackCount = new AtomicInteger(count);
092 }
093
094 @Override
095 public String toString() {
096 return command.getCommandId() + "=" + ackCount.get();
097 }
098 }
099
100 class FanoutTransportHandler extends DefaultTransportListener {
101
102 private final URI uri;
103 private Transport transport;
104
105 private int connectFailures;
106 private long reconnectDelay = initialReconnectDelay;
107 private long reconnectDate;
108
109 public FanoutTransportHandler(URI uri) {
110 this.uri = uri;
111 }
112
113 @Override
114 public void onCommand(Object o) {
115 Command command = (Command)o;
116 if (command.isResponse()) {
117 Integer id = new Integer(((Response)command).getCorrelationId());
118 RequestCounter rc = requestMap.get(id);
119 if (rc != null) {
120 if (rc.ackCount.decrementAndGet() <= 0) {
121 requestMap.remove(id);
122 transportListenerOnCommand(command);
123 }
124 } else {
125 transportListenerOnCommand(command);
126 }
127 } else {
128 transportListenerOnCommand(command);
129 }
130 }
131
132 @Override
133 public void onException(IOException error) {
134 try {
135 synchronized (reconnectMutex) {
136 if (transport == null || !transport.isConnected()) {
137 return;
138 }
139
140 LOG.debug("Transport failed, starting up reconnect task", error);
141
142 ServiceSupport.dispose(transport);
143 transport = null;
144 connectedCount--;
145 if (primary == this) {
146 primary = null;
147 }
148 reconnectTask.wakeup();
149 }
150 } catch (InterruptedException e) {
151 Thread.currentThread().interrupt();
152 if (transportListener != null) {
153 transportListener.onException(new InterruptedIOException());
154 }
155 }
156 }
157 }
158
159 public FanoutTransport() throws InterruptedIOException {
160 // Setup a task that is used to reconnect the a connection async.
161 reconnectTaskFactory = new TaskRunnerFactory();
162 reconnectTaskFactory.init();
163 reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
164 public boolean iterate() {
165 return doConnect();
166 }
167 }, "ActiveMQ Fanout Worker: " + System.identityHashCode(this));
168 }
169
170 /**
171 * @return
172 */
173 private boolean doConnect() {
174 long closestReconnectDate = 0;
175 synchronized (reconnectMutex) {
176
177 if (disposed || connectionFailure != null) {
178 reconnectMutex.notifyAll();
179 }
180
181 if (transports.size() == connectedCount || disposed || connectionFailure != null) {
182 return false;
183 } else {
184
185 if (transports.isEmpty()) {
186 // connectionFailure = new IOException("No uris available to
187 // connect to.");
188 } else {
189
190 // Try to connect them up.
191 Iterator<FanoutTransportHandler> iter = transports.iterator();
192 for (int i = 0; iter.hasNext() && !disposed; i++) {
193
194 long now = System.currentTimeMillis();
195
196 FanoutTransportHandler fanoutHandler = iter.next();
197 if (fanoutHandler.transport != null) {
198 continue;
199 }
200
201 // Are we waiting a little to try to reconnect this one?
202 if (fanoutHandler.reconnectDate != 0 && fanoutHandler.reconnectDate > now) {
203 if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
204 closestReconnectDate = fanoutHandler.reconnectDate;
205 }
206 continue;
207 }
208
209 URI uri = fanoutHandler.uri;
210 try {
211 LOG.debug("Stopped: " + this);
212 LOG.debug("Attempting connect to: " + uri);
213 Transport t = TransportFactory.compositeConnect(uri);
214 fanoutHandler.transport = t;
215 t.setTransportListener(fanoutHandler);
216 if (started) {
217 restoreTransport(fanoutHandler);
218 }
219 LOG.debug("Connection established");
220 fanoutHandler.reconnectDelay = initialReconnectDelay;
221 fanoutHandler.connectFailures = 0;
222 if (primary == null) {
223 primary = fanoutHandler;
224 }
225 connectedCount++;
226 } catch (Exception e) {
227 LOG.debug("Connect fail to: " + uri + ", reason: " + e);
228
229 if( fanoutHandler.transport !=null ) {
230 ServiceSupport.dispose(fanoutHandler.transport);
231 fanoutHandler.transport=null;
232 }
233
234 if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
235 LOG.error("Failed to connect to transport after: " + fanoutHandler.connectFailures + " attempt(s)");
236 connectionFailure = e;
237 reconnectMutex.notifyAll();
238 return false;
239 } else {
240
241 if (useExponentialBackOff) {
242 // Exponential increment of reconnect delay.
243 fanoutHandler.reconnectDelay *= backOffMultiplier;
244 if (fanoutHandler.reconnectDelay > maxReconnectDelay) {
245 fanoutHandler.reconnectDelay = maxReconnectDelay;
246 }
247 }
248
249 fanoutHandler.reconnectDate = now + fanoutHandler.reconnectDelay;
250
251 if (closestReconnectDate == 0 || fanoutHandler.reconnectDate < closestReconnectDate) {
252 closestReconnectDate = fanoutHandler.reconnectDate;
253 }
254 }
255 }
256 }
257 if (transports.size() == connectedCount || disposed) {
258 reconnectMutex.notifyAll();
259 return false;
260 }
261
262 }
263 }
264
265 }
266
267 try {
268 long reconnectDelay = closestReconnectDate - System.currentTimeMillis();
269 if (reconnectDelay > 0) {
270 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
271 Thread.sleep(reconnectDelay);
272 }
273 } catch (InterruptedException e1) {
274 Thread.currentThread().interrupt();
275 }
276 return true;
277 }
278
279 public void start() throws Exception {
280 synchronized (reconnectMutex) {
281 LOG.debug("Started.");
282 if (started) {
283 return;
284 }
285 started = true;
286 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
287 FanoutTransportHandler th = iter.next();
288 if (th.transport != null) {
289 restoreTransport(th);
290 }
291 }
292 connected=true;
293 }
294 }
295
296 public void stop() throws Exception {
297 try {
298 synchronized (reconnectMutex) {
299 ServiceStopper ss = new ServiceStopper();
300
301 if (!started) {
302 return;
303 }
304 started = false;
305 disposed = true;
306 connected=false;
307
308 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
309 FanoutTransportHandler th = iter.next();
310 if (th.transport != null) {
311 ss.stop(th.transport);
312 }
313 }
314
315 LOG.debug("Stopped: " + this);
316 ss.throwFirstException();
317 }
318 } finally {
319 reconnectTask.shutdown();
320 reconnectTaskFactory.shutdownNow();
321 }
322 }
323
324 public int getMinAckCount() {
325 return minAckCount;
326 }
327
328 public void setMinAckCount(int minAckCount) {
329 this.minAckCount = minAckCount;
330 }
331
332 public long getInitialReconnectDelay() {
333 return initialReconnectDelay;
334 }
335
336 public void setInitialReconnectDelay(long initialReconnectDelay) {
337 this.initialReconnectDelay = initialReconnectDelay;
338 }
339
340 public long getMaxReconnectDelay() {
341 return maxReconnectDelay;
342 }
343
344 public void setMaxReconnectDelay(long maxReconnectDelay) {
345 this.maxReconnectDelay = maxReconnectDelay;
346 }
347
348 public long getReconnectDelayExponent() {
349 return backOffMultiplier;
350 }
351
352 public void setReconnectDelayExponent(long reconnectDelayExponent) {
353 this.backOffMultiplier = reconnectDelayExponent;
354 }
355
356 public int getMaxReconnectAttempts() {
357 return maxReconnectAttempts;
358 }
359
360 public void setMaxReconnectAttempts(int maxReconnectAttempts) {
361 this.maxReconnectAttempts = maxReconnectAttempts;
362 }
363
364 public void oneway(Object o) throws IOException {
365 final Command command = (Command)o;
366 try {
367 synchronized (reconnectMutex) {
368
369 // Wait for transport to be connected.
370 while (connectedCount < minAckCount && !disposed && connectionFailure == null) {
371 LOG.debug("Waiting for at least " + minAckCount + " transports to be connected.");
372 reconnectMutex.wait(1000);
373 }
374
375 // Still not fully connected.
376 if (connectedCount < minAckCount) {
377
378 Exception error;
379
380 // Throw the right kind of error..
381 if (disposed) {
382 error = new IOException("Transport disposed.");
383 } else if (connectionFailure != null) {
384 error = connectionFailure;
385 } else {
386 error = new IOException("Unexpected failure.");
387 }
388
389 if (error instanceof IOException) {
390 throw (IOException)error;
391 }
392 throw IOExceptionSupport.create(error);
393 }
394
395 // If it was a request and it was not being tracked by
396 // the state tracker,
397 // then hold it in the requestMap so that we can replay
398 // it later.
399 boolean fanout = isFanoutCommand(command);
400 if (stateTracker.track(command) == null && command.isResponseRequired()) {
401 int size = fanout ? minAckCount : 1;
402 requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
403 }
404
405 // Send the message.
406 if (fanout) {
407 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
408 FanoutTransportHandler th = iter.next();
409 if (th.transport != null) {
410 try {
411 th.transport.oneway(command);
412 } catch (IOException e) {
413 LOG.debug("Send attempt: failed.");
414 th.onException(e);
415 }
416 }
417 }
418 } else {
419 try {
420 primary.transport.oneway(command);
421 } catch (IOException e) {
422 LOG.debug("Send attempt: failed.");
423 primary.onException(e);
424 }
425 }
426
427 }
428 } catch (InterruptedException e) {
429 // Some one may be trying to stop our thread.
430 Thread.currentThread().interrupt();
431 throw new InterruptedIOException();
432 }
433 }
434
435 /**
436 * @param command
437 * @return
438 */
439 private boolean isFanoutCommand(Command command) {
440 if (command.isMessage()) {
441 if( fanOutQueues ) {
442 return true;
443 }
444 return ((Message)command).getDestination().isTopic();
445 }
446 if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ||
447 command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
448 return false;
449 }
450 return true;
451 }
452
453 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
454 throw new AssertionError("Unsupported Method");
455 }
456
457 public Object request(Object command) throws IOException {
458 throw new AssertionError("Unsupported Method");
459 }
460
461 public Object request(Object command, int timeout) throws IOException {
462 throw new AssertionError("Unsupported Method");
463 }
464
465 public void reconnect() {
466 LOG.debug("Waking up reconnect task");
467 try {
468 reconnectTask.wakeup();
469 } catch (InterruptedException e) {
470 Thread.currentThread().interrupt();
471 }
472 }
473
474 public TransportListener getTransportListener() {
475 return transportListener;
476 }
477
478 public void setTransportListener(TransportListener commandListener) {
479 this.transportListener = commandListener;
480 }
481
482 public <T> T narrow(Class<T> target) {
483
484 if (target.isAssignableFrom(getClass())) {
485 return target.cast(this);
486 }
487
488 synchronized (reconnectMutex) {
489 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
490 FanoutTransportHandler th = iter.next();
491 if (th.transport != null) {
492 T rc = th.transport.narrow(target);
493 if (rc != null) {
494 return rc;
495 }
496 }
497 }
498 }
499
500 return null;
501
502 }
503
504 protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException {
505 th.transport.start();
506 stateTracker.setRestoreConsumers(th.transport == primary);
507 stateTracker.restore(th.transport);
508 for (Iterator<RequestCounter> iter2 = requestMap.values().iterator(); iter2.hasNext();) {
509 RequestCounter rc = iter2.next();
510 th.transport.oneway(rc.command);
511 }
512 }
513
514 public void add(boolean reblance,URI uris[]) {
515
516 synchronized (reconnectMutex) {
517 for (int i = 0; i < uris.length; i++) {
518 URI uri = uris[i];
519
520 boolean match = false;
521 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
522 FanoutTransportHandler th = iter.next();
523 if (th.uri.equals(uri)) {
524 match = true;
525 break;
526 }
527 }
528 if (!match) {
529 FanoutTransportHandler th = new FanoutTransportHandler(uri);
530 transports.add(th);
531 reconnect();
532 }
533 }
534 }
535
536 }
537
538 public void remove(boolean rebalance,URI uris[]) {
539
540 synchronized (reconnectMutex) {
541 for (int i = 0; i < uris.length; i++) {
542 URI uri = uris[i];
543
544 for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
545 FanoutTransportHandler th = iter.next();
546 if (th.uri.equals(uri)) {
547 if (th.transport != null) {
548 ServiceSupport.dispose(th.transport);
549 connectedCount--;
550 }
551 iter.remove();
552 break;
553 }
554 }
555 }
556 }
557
558 }
559
560 public void reconnect(URI uri) throws IOException {
561 add(true,new URI[]{uri});
562
563 }
564
565 public boolean isReconnectSupported() {
566 return true;
567 }
568
569 public boolean isUpdateURIsSupported() {
570 return true;
571 }
572 public void updateURIs(boolean reblance,URI[] uris) throws IOException {
573 add(reblance,uris);
574 }
575
576
577 public String getRemoteAddress() {
578 if (primary != null) {
579 if (primary.transport != null) {
580 return primary.transport.getRemoteAddress();
581 }
582 }
583 return null;
584 }
585
586 protected void transportListenerOnCommand(Command command) {
587 if (transportListener != null) {
588 transportListener.onCommand(command);
589 }
590 }
591
592 public boolean isFaultTolerant() {
593 return true;
594 }
595
596 public boolean isFanOutQueues() {
597 return fanOutQueues;
598 }
599
600 public void setFanOutQueues(boolean fanOutQueues) {
601 this.fanOutQueues = fanOutQueues;
602 }
603
604 public boolean isDisposed() {
605 return disposed;
606 }
607
608
609 public boolean isConnected() {
610 return connected;
611 }
612
613 public int getReceiveCounter() {
614 int rc = 0;
615 synchronized (reconnectMutex) {
616 for (FanoutTransportHandler th : transports) {
617 if (th.transport != null) {
618 rc += th.transport.getReceiveCounter();
619 }
620 }
621 }
622 return rc;
623 }
624 }