001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.transport.discovery.multicast;
019
020 import java.io.IOException;
021 import java.net.DatagramPacket;
022 import java.net.InetAddress;
023 import java.net.InetSocketAddress;
024 import java.net.MulticastSocket;
025 import java.net.NetworkInterface;
026 import java.net.SocketAddress;
027 import java.net.SocketTimeoutException;
028 import java.net.URI;
029 import java.util.Iterator;
030 import java.util.Map;
031 import java.util.concurrent.ConcurrentHashMap;
032 import java.util.concurrent.ExecutorService;
033 import java.util.concurrent.LinkedBlockingQueue;
034 import java.util.concurrent.ThreadFactory;
035 import java.util.concurrent.ThreadPoolExecutor;
036 import java.util.concurrent.TimeUnit;
037 import java.util.concurrent.atomic.AtomicBoolean;
038
039 import org.apache.activemq.command.DiscoveryEvent;
040 import org.apache.activemq.transport.discovery.DiscoveryAgent;
041 import org.apache.activemq.transport.discovery.DiscoveryListener;
042 import org.apache.activemq.util.ThreadPoolUtils;
043 import org.slf4j.Logger;
044 import org.slf4j.LoggerFactory;
045
046 /**
047 * A {@link DiscoveryAgent} using a multicast address and heartbeat packets
048 * encoded using any wireformat, but openwire by default.
049 *
050 *
051 */
052 public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
053
054 public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
055 public static final String DEFAULT_HOST_STR = "default";
056 public static final String DEFAULT_HOST_IP = System.getProperty("activemq.partition.discovery", "239.255.2.3");
057 public static final int DEFAULT_PORT = 6155;
058
059 private static final Logger LOG = LoggerFactory.getLogger(MulticastDiscoveryAgent.class);
060 private static final String TYPE_SUFFIX = "ActiveMQ-4.";
061 private static final String ALIVE = "alive.";
062 private static final String DEAD = "dead.";
063 private static final String DELIMITER = "%";
064 private static final int BUFF_SIZE = 8192;
065 private static final int DEFAULT_IDLE_TIME = 500;
066 private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10;
067
068 private long initialReconnectDelay = 1000 * 5;
069 private long maxReconnectDelay = 1000 * 30;
070 private long backOffMultiplier = 2;
071 private boolean useExponentialBackOff;
072 private int maxReconnectAttempts;
073
074 private int timeToLive = 1;
075 private boolean loopBackMode;
076 private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>();
077 private String group = "default";
078 private URI discoveryURI;
079 private InetAddress inetAddress;
080 private SocketAddress sockAddress;
081 private DiscoveryListener discoveryListener;
082 private String selfService;
083 private MulticastSocket mcast;
084 private Thread runner;
085 private long keepAliveInterval = DEFAULT_IDLE_TIME;
086 private String mcInterface;
087 private String mcNetworkInterface;
088 private String mcJoinNetworkInterface;
089 private long lastAdvertizeTime;
090 private AtomicBoolean started = new AtomicBoolean(false);
091 private boolean reportAdvertizeFailed = true;
092 private ExecutorService executor = null;
093
094 class RemoteBrokerData extends DiscoveryEvent {
095 long lastHeartBeat;
096 long recoveryTime;
097 int failureCount;
098 boolean failed;
099
100 public RemoteBrokerData(String brokerName, String service) {
101 super(service);
102 setBrokerName(brokerName);
103 this.lastHeartBeat = System.currentTimeMillis();
104 }
105
106 public synchronized void updateHeartBeat() {
107 lastHeartBeat = System.currentTimeMillis();
108
109 // Consider that the broker recovery has succeeded if it has not
110 // failed in 60 seconds.
111 if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
112 if (LOG.isDebugEnabled()) {
113 LOG.debug("I now think that the " + serviceName + " service has recovered.");
114 }
115 failureCount = 0;
116 recoveryTime = 0;
117 }
118 }
119
120 public synchronized long getLastHeartBeat() {
121 return lastHeartBeat;
122 }
123
124 public synchronized boolean markFailed() {
125 if (!failed) {
126 failed = true;
127 failureCount++;
128
129 long reconnectDelay;
130 if (!useExponentialBackOff) {
131 reconnectDelay = initialReconnectDelay;
132 } else {
133 reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount);
134 if (reconnectDelay > maxReconnectDelay) {
135 reconnectDelay = maxReconnectDelay;
136 }
137 }
138
139 if (LOG.isDebugEnabled()) {
140 LOG.debug("Remote failure of " + serviceName + " while still receiving multicast advertisements. Advertising events will be suppressed for " + reconnectDelay
141 + " ms, the current failure count is: " + failureCount);
142 }
143
144 recoveryTime = System.currentTimeMillis() + reconnectDelay;
145 return true;
146 }
147 return false;
148 }
149
150 /**
151 * @return true if this broker is marked failed and it is now the right
152 * time to start recovery.
153 */
154 public synchronized boolean doRecovery() {
155 if (!failed) {
156 return false;
157 }
158
159 // Are we done trying to recover this guy?
160 if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
161 if (LOG.isDebugEnabled()) {
162 LOG.debug("Max reconnect attempts of the " + serviceName + " service has been reached.");
163 }
164 return false;
165 }
166
167 // Is it not yet time?
168 if (System.currentTimeMillis() < recoveryTime) {
169 return false;
170 }
171
172 if (LOG.isDebugEnabled()) {
173 LOG.debug("Resuming event advertisement of the " + serviceName + " service.");
174 }
175 failed = false;
176 return true;
177 }
178
179 public boolean isFailed() {
180 return failed;
181 }
182 }
183
184 /**
185 * Set the discovery listener
186 *
187 * @param listener
188 */
189 public void setDiscoveryListener(DiscoveryListener listener) {
190 this.discoveryListener = listener;
191 }
192
193 /**
194 * register a service
195 */
196 public void registerService(String name) throws IOException {
197 this.selfService = name;
198 if (started.get()) {
199 doAdvertizeSelf();
200 }
201 }
202
203 /**
204 * @return Returns the loopBackMode.
205 */
206 public boolean isLoopBackMode() {
207 return loopBackMode;
208 }
209
210 /**
211 * @param loopBackMode The loopBackMode to set.
212 */
213 public void setLoopBackMode(boolean loopBackMode) {
214 this.loopBackMode = loopBackMode;
215 }
216
217 /**
218 * @return Returns the timeToLive.
219 */
220 public int getTimeToLive() {
221 return timeToLive;
222 }
223
224 /**
225 * @param timeToLive The timeToLive to set.
226 */
227 public void setTimeToLive(int timeToLive) {
228 this.timeToLive = timeToLive;
229 }
230
231 /**
232 * @return the discoveryURI
233 */
234 public URI getDiscoveryURI() {
235 return discoveryURI;
236 }
237
238 /**
239 * Set the discoveryURI
240 *
241 * @param discoveryURI
242 */
243 public void setDiscoveryURI(URI discoveryURI) {
244 this.discoveryURI = discoveryURI;
245 }
246
247 public long getKeepAliveInterval() {
248 return keepAliveInterval;
249 }
250
251 public void setKeepAliveInterval(long keepAliveInterval) {
252 this.keepAliveInterval = keepAliveInterval;
253 }
254
255 public void setInterface(String mcInterface) {
256 this.mcInterface = mcInterface;
257 }
258
259 public void setNetworkInterface(String mcNetworkInterface) {
260 this.mcNetworkInterface = mcNetworkInterface;
261 }
262
263 public void setJoinNetworkInterface(String mcJoinNetwrokInterface) {
264 this.mcJoinNetworkInterface = mcJoinNetwrokInterface;
265 }
266
267 /**
268 * start the discovery agent
269 *
270 * @throws Exception
271 */
272 public void start() throws Exception {
273
274 if (started.compareAndSet(false, true)) {
275
276 if (group == null || group.length() == 0) {
277 throw new IOException("You must specify a group to discover");
278 }
279 String type = getType();
280 if (!type.endsWith(".")) {
281 LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
282 type += ".";
283 }
284
285 if (discoveryURI == null) {
286 discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING);
287 }
288
289 if (LOG.isTraceEnabled())
290 LOG.trace("start - discoveryURI = " + discoveryURI);
291
292 String myHost = discoveryURI.getHost();
293 int myPort = discoveryURI.getPort();
294
295 if( DEFAULT_HOST_STR.equals(myHost) )
296 myHost = DEFAULT_HOST_IP;
297
298 if(myPort < 0 )
299 myPort = DEFAULT_PORT;
300
301 if (LOG.isTraceEnabled()) {
302 LOG.trace("start - myHost = " + myHost);
303 LOG.trace("start - myPort = " + myPort);
304 LOG.trace("start - group = " + group );
305 LOG.trace("start - interface = " + mcInterface );
306 LOG.trace("start - network interface = " + mcNetworkInterface );
307 LOG.trace("start - join network interface = " + mcJoinNetworkInterface );
308 }
309
310 this.inetAddress = InetAddress.getByName(myHost);
311 this.sockAddress = new InetSocketAddress(this.inetAddress, myPort);
312 mcast = new MulticastSocket(myPort);
313 mcast.setLoopbackMode(loopBackMode);
314 mcast.setTimeToLive(getTimeToLive());
315 if (mcJoinNetworkInterface != null) {
316 mcast.joinGroup(sockAddress, NetworkInterface.getByName(mcJoinNetworkInterface));
317 }
318 else {
319 mcast.joinGroup(inetAddress);
320 }
321 mcast.setSoTimeout((int)keepAliveInterval);
322 if (mcInterface != null) {
323 mcast.setInterface(InetAddress.getByName(mcInterface));
324 }
325 if (mcNetworkInterface != null) {
326 mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface));
327 }
328 runner = new Thread(this);
329 runner.setName(this.toString() + ":" + runner.getName());
330 runner.setDaemon(true);
331 runner.start();
332 doAdvertizeSelf();
333 }
334 }
335
336 /**
337 * stop the channel
338 *
339 * @throws Exception
340 */
341 public void stop() throws Exception {
342 if (started.compareAndSet(true, false)) {
343 doAdvertizeSelf();
344 if (mcast != null) {
345 mcast.close();
346 }
347 if (runner != null) {
348 runner.interrupt();
349 }
350 if (executor != null) {
351 ThreadPoolUtils.shutdownNow(executor);
352 executor = null;
353 }
354 }
355 }
356
357 public String getType() {
358 return group + "." + TYPE_SUFFIX;
359 }
360
361 public void run() {
362 byte[] buf = new byte[BUFF_SIZE];
363 DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
364 while (started.get()) {
365 doTimeKeepingServices();
366 try {
367 mcast.receive(packet);
368 if (packet.getLength() > 0) {
369 String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
370 processData(str);
371 }
372 } catch (SocketTimeoutException se) {
373 // ignore
374 } catch (IOException e) {
375 if (started.get()) {
376 LOG.error("failed to process packet: " + e);
377 }
378 }
379 }
380 }
381
382 private void processData(String str) {
383 if (discoveryListener != null) {
384 if (str.startsWith(getType())) {
385 String payload = str.substring(getType().length());
386 if (payload.startsWith(ALIVE)) {
387 String brokerName = getBrokerName(payload.substring(ALIVE.length()));
388 String service = payload.substring(ALIVE.length() + brokerName.length() + 2);
389 processAlive(brokerName, service);
390 } else {
391 String brokerName = getBrokerName(payload.substring(DEAD.length()));
392 String service = payload.substring(DEAD.length() + brokerName.length() + 2);
393 processDead(service);
394 }
395 }
396 }
397 }
398
399 private void doTimeKeepingServices() {
400 if (started.get()) {
401 long currentTime = System.currentTimeMillis();
402 if (currentTime < lastAdvertizeTime || ((currentTime - keepAliveInterval) > lastAdvertizeTime)) {
403 doAdvertizeSelf();
404 lastAdvertizeTime = currentTime;
405 }
406 doExpireOldServices();
407 }
408 }
409
410 private void doAdvertizeSelf() {
411 if (selfService != null) {
412 String payload = getType();
413 payload += started.get() ? ALIVE : DEAD;
414 payload += DELIMITER + "localhost" + DELIMITER;
415 payload += selfService;
416 try {
417 byte[] data = payload.getBytes();
418 DatagramPacket packet = new DatagramPacket(data, 0, data.length, sockAddress);
419 mcast.send(packet);
420 } catch (IOException e) {
421 // If a send fails, chances are all subsequent sends will fail
422 // too.. No need to keep reporting the
423 // same error over and over.
424 if (reportAdvertizeFailed) {
425 reportAdvertizeFailed = false;
426 LOG.error("Failed to advertise our service: " + payload, e);
427 if ("Operation not permitted".equals(e.getMessage())) {
428 LOG.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup. "
429 + "Please make sure that the OS is properly configured to allow multicast traffic over: " + mcast.getLocalAddress());
430 }
431 }
432 }
433 }
434 }
435
436 private void processAlive(String brokerName, String service) {
437 if (selfService == null || !service.equals(selfService)) {
438 RemoteBrokerData data = brokersByService.get(service);
439 if (data == null) {
440 data = new RemoteBrokerData(brokerName, service);
441 brokersByService.put(service, data);
442 fireServiceAddEvent(data);
443 doAdvertizeSelf();
444 } else {
445 data.updateHeartBeat();
446 if (data.doRecovery()) {
447 fireServiceAddEvent(data);
448 }
449 }
450 }
451 }
452
453 private void processDead(String service) {
454 if (!service.equals(selfService)) {
455 RemoteBrokerData data = brokersByService.remove(service);
456 if (data != null && !data.isFailed()) {
457 fireServiceRemovedEvent(data);
458 }
459 }
460 }
461
462 private void doExpireOldServices() {
463 long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH);
464 for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
465 RemoteBrokerData data = i.next();
466 if (data.getLastHeartBeat() < expireTime) {
467 processDead(data.getServiceName());
468 }
469 }
470 }
471
472 private String getBrokerName(String str) {
473 String result = null;
474 int start = str.indexOf(DELIMITER);
475 if (start >= 0) {
476 int end = str.indexOf(DELIMITER, start + 1);
477 result = str.substring(start + 1, end);
478 }
479 return result;
480 }
481
482 public void serviceFailed(DiscoveryEvent event) throws IOException {
483 RemoteBrokerData data = brokersByService.get(event.getServiceName());
484 if (data != null && data.markFailed()) {
485 fireServiceRemovedEvent(data);
486 }
487 }
488
489 private void fireServiceRemovedEvent(final RemoteBrokerData data) {
490 if (discoveryListener != null && started.get()) {
491 // Have the listener process the event async so that
492 // he does not block this thread since we are doing time sensitive
493 // processing of events.
494 getExecutor().execute(new Runnable() {
495 public void run() {
496 DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
497 if (discoveryListener != null) {
498 discoveryListener.onServiceRemove(data);
499 }
500 }
501 });
502 }
503 }
504
505 private void fireServiceAddEvent(final RemoteBrokerData data) {
506 if (discoveryListener != null && started.get()) {
507
508 // Have the listener process the event async so that
509 // he does not block this thread since we are doing time sensitive
510 // processing of events.
511 getExecutor().execute(new Runnable() {
512 public void run() {
513 DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
514 if (discoveryListener != null) {
515 discoveryListener.onServiceAdd(data);
516 }
517 }
518 });
519 }
520 }
521
522 private ExecutorService getExecutor() {
523 if (executor == null) {
524 final String threadName = "Notifier-" + this.toString();
525 executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
526 public Thread newThread(Runnable runable) {
527 Thread t = new Thread(runable, threadName);
528 t.setDaemon(true);
529 return t;
530 }
531 });
532 }
533 return executor;
534 }
535
536 public long getBackOffMultiplier() {
537 return backOffMultiplier;
538 }
539
540 public void setBackOffMultiplier(long backOffMultiplier) {
541 this.backOffMultiplier = backOffMultiplier;
542 }
543
544 public long getInitialReconnectDelay() {
545 return initialReconnectDelay;
546 }
547
548 public void setInitialReconnectDelay(long initialReconnectDelay) {
549 this.initialReconnectDelay = initialReconnectDelay;
550 }
551
552 public int getMaxReconnectAttempts() {
553 return maxReconnectAttempts;
554 }
555
556 public void setMaxReconnectAttempts(int maxReconnectAttempts) {
557 this.maxReconnectAttempts = maxReconnectAttempts;
558 }
559
560 public long getMaxReconnectDelay() {
561 return maxReconnectDelay;
562 }
563
564 public void setMaxReconnectDelay(long maxReconnectDelay) {
565 this.maxReconnectDelay = maxReconnectDelay;
566 }
567
568 public boolean isUseExponentialBackOff() {
569 return useExponentialBackOff;
570 }
571
572 public void setUseExponentialBackOff(boolean useExponentialBackOff) {
573 this.useExponentialBackOff = useExponentialBackOff;
574 }
575
576 public void setGroup(String group) {
577 this.group = group;
578 }
579
580 @Override
581 public String toString() {
582 return "MulticastDiscoveryAgent-"
583 + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener);
584 }
585 }