001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.transport.discovery.multicast;
019
020import java.io.IOException;
021import java.net.DatagramPacket;
022import java.net.InetAddress;
023import java.net.InetSocketAddress;
024import java.net.InterfaceAddress;
025import java.net.MulticastSocket;
026import java.net.NetworkInterface;
027import java.net.SocketAddress;
028import java.net.SocketException;
029import java.net.SocketTimeoutException;
030import java.net.URI;
031import java.util.ArrayList;
032import java.util.Enumeration;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Map;
036import java.util.concurrent.ConcurrentHashMap;
037import java.util.concurrent.ExecutorService;
038import java.util.concurrent.LinkedBlockingQueue;
039import java.util.concurrent.ThreadFactory;
040import java.util.concurrent.ThreadPoolExecutor;
041import java.util.concurrent.TimeUnit;
042import java.util.concurrent.atomic.AtomicBoolean;
043
044import org.apache.activemq.command.DiscoveryEvent;
045import org.apache.activemq.transport.discovery.DiscoveryAgent;
046import org.apache.activemq.transport.discovery.DiscoveryListener;
047import org.apache.activemq.util.ThreadPoolUtils;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * A {@link DiscoveryAgent} using a multicast address and heartbeat packets
053 * encoded using any wireformat, but openwire by default.
054 * 
055 * 
056 */
057public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
058
059    public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
060    public static final String DEFAULT_HOST_STR = "default"; 
061    public static final String DEFAULT_HOST_IP  = System.getProperty("activemq.partition.discovery", "239.255.2.3"); 
062    public static final int    DEFAULT_PORT  = 6155; 
063        
064    private static final Logger LOG = LoggerFactory.getLogger(MulticastDiscoveryAgent.class);
065    private static final String TYPE_SUFFIX = "ActiveMQ-4.";
066    private static final String ALIVE = "alive.";
067    private static final String DEAD = "dead.";
068    private static final String DELIMITER = "%";
069    private static final int BUFF_SIZE = 8192;
070    private static final int DEFAULT_IDLE_TIME = 500;
071    private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10;
072
073    private long initialReconnectDelay = 1000 * 5;
074    private long maxReconnectDelay = 1000 * 30;
075    private long backOffMultiplier = 2;
076    private boolean useExponentialBackOff;
077    private int maxReconnectAttempts;
078
079    private int timeToLive = 1;
080    private boolean loopBackMode;
081    private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>();
082    private String group = "default";
083    private URI discoveryURI;
084    private InetAddress inetAddress;
085    private SocketAddress sockAddress;
086    private DiscoveryListener discoveryListener;
087    private String selfService;
088    private MulticastSocket mcast;
089    private Thread runner;
090    private long keepAliveInterval = DEFAULT_IDLE_TIME;
091    private String mcInterface;
092    private String mcNetworkInterface;
093    private String mcJoinNetworkInterface;
094    private long lastAdvertizeTime;
095    private AtomicBoolean started = new AtomicBoolean(false);
096    private boolean reportAdvertizeFailed = true;
097    private ExecutorService executor = null;
098
099    class RemoteBrokerData extends DiscoveryEvent {
100        long lastHeartBeat;
101        long recoveryTime;
102        int failureCount;
103        boolean failed;
104
105        public RemoteBrokerData(String brokerName, String service) {
106            super(service);
107            setBrokerName(brokerName);
108            this.lastHeartBeat = System.currentTimeMillis();
109        }
110
111        public synchronized void updateHeartBeat() {
112            lastHeartBeat = System.currentTimeMillis();
113
114            // Consider that the broker recovery has succeeded if it has not
115            // failed in 60 seconds.
116            if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
117                if (LOG.isDebugEnabled()) {
118                    LOG.debug("I now think that the " + serviceName + " service has recovered.");
119                }
120                failureCount = 0;
121                recoveryTime = 0;
122            }
123        }
124
125        public synchronized long getLastHeartBeat() {
126            return lastHeartBeat;
127        }
128
129        public synchronized boolean markFailed() {
130            if (!failed) {
131                failed = true;
132                failureCount++;
133
134                long reconnectDelay;
135                if (!useExponentialBackOff) {
136                    reconnectDelay = initialReconnectDelay;
137                } else {
138                    reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount);
139                    if (reconnectDelay > maxReconnectDelay) {
140                        reconnectDelay = maxReconnectDelay;
141                    }
142                }
143
144                if (LOG.isDebugEnabled()) {
145                    LOG.debug("Remote failure of " + serviceName + " while still receiving multicast advertisements.  Advertising events will be suppressed for " + reconnectDelay
146                              + " ms, the current failure count is: " + failureCount);
147                }
148
149                recoveryTime = System.currentTimeMillis() + reconnectDelay;
150                return true;
151            }
152            return false;
153        }
154
155        /**
156         * @return true if this broker is marked failed and it is now the right
157         *         time to start recovery.
158         */
159        public synchronized boolean doRecovery() {
160            if (!failed) {
161                return false;
162            }
163
164            // Are we done trying to recover this guy?
165            if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
166                if (LOG.isDebugEnabled()) {
167                    LOG.debug("Max reconnect attempts of the " + serviceName + " service has been reached.");
168                }
169                return false;
170            }
171
172            // Is it not yet time?
173            if (System.currentTimeMillis() < recoveryTime) {
174                return false;
175            }
176
177            if (LOG.isDebugEnabled()) {
178                LOG.debug("Resuming event advertisement of the " + serviceName + " service.");
179            }
180            failed = false;
181            return true;
182        }
183
184        public boolean isFailed() {
185            return failed;
186        }
187    }
188
189    /**
190     * Set the discovery listener
191     * 
192     * @param listener
193     */
194    public void setDiscoveryListener(DiscoveryListener listener) {
195        this.discoveryListener = listener;
196    }
197
198    /**
199     * register a service
200     */
201    public void registerService(String name) throws IOException {
202        this.selfService = name;
203        if (started.get()) {
204            doAdvertizeSelf();
205        }
206    }
207
208    /**
209     * @return Returns the loopBackMode.
210     */
211    public boolean isLoopBackMode() {
212        return loopBackMode;
213    }
214
215    /**
216     * @param loopBackMode The loopBackMode to set.
217     */
218    public void setLoopBackMode(boolean loopBackMode) {
219        this.loopBackMode = loopBackMode;
220    }
221
222    /**
223     * @return Returns the timeToLive.
224     */
225    public int getTimeToLive() {
226        return timeToLive;
227    }
228
229    /**
230     * @param timeToLive The timeToLive to set.
231     */
232    public void setTimeToLive(int timeToLive) {
233        this.timeToLive = timeToLive;
234    }
235
236    /**
237     * @return the discoveryURI
238     */
239    public URI getDiscoveryURI() {
240        return discoveryURI;
241    }
242
243    /**
244     * Set the discoveryURI
245     * 
246     * @param discoveryURI
247     */
248    public void setDiscoveryURI(URI discoveryURI) {
249        this.discoveryURI = discoveryURI;
250    }
251
252    public long getKeepAliveInterval() {
253        return keepAliveInterval;
254    }
255
256    public void setKeepAliveInterval(long keepAliveInterval) {
257        this.keepAliveInterval = keepAliveInterval;
258    }
259    
260    public void setInterface(String mcInterface) {
261        this.mcInterface = mcInterface;
262    }
263    
264    public void setNetworkInterface(String mcNetworkInterface) {
265        this.mcNetworkInterface = mcNetworkInterface;    
266    }
267    
268    public void setJoinNetworkInterface(String mcJoinNetwrokInterface) {
269        this.mcJoinNetworkInterface = mcJoinNetwrokInterface;
270    }
271    
272    /**
273     * start the discovery agent
274     * 
275     * @throws Exception
276     */
277    public void start() throws Exception {
278        
279        if (started.compareAndSet(false, true)) {               
280                                
281            if (group == null || group.length() == 0) {
282                throw new IOException("You must specify a group to discover");
283            }
284            String type = getType();
285            if (!type.endsWith(".")) {
286                LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
287                type += ".";
288            }
289            
290            if (discoveryURI == null) {
291                discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING);
292            }
293            
294            if (LOG.isTraceEnabled()) 
295                        LOG.trace("start - discoveryURI = " + discoveryURI);                                      
296                  
297                  String myHost = discoveryURI.getHost();
298                  int    myPort = discoveryURI.getPort(); 
299                     
300                  if( DEFAULT_HOST_STR.equals(myHost) ) 
301                        myHost = DEFAULT_HOST_IP;                         
302                  
303                  if(myPort < 0 )
304                    myPort = DEFAULT_PORT;                  
305                  
306                  if (LOG.isTraceEnabled()) {
307                        LOG.trace("start - myHost = " + myHost); 
308                        LOG.trace("start - myPort = " + myPort);        
309                        LOG.trace("start - group  = " + group );                                
310                        LOG.trace("start - interface  = " + mcInterface );
311                        LOG.trace("start - network interface  = " + mcNetworkInterface );
312                        LOG.trace("start - join network interface  = " + mcJoinNetworkInterface );
313                  }     
314                  
315            this.inetAddress = InetAddress.getByName(myHost);
316            this.sockAddress = new InetSocketAddress(this.inetAddress, myPort);
317            mcast = new MulticastSocket(myPort);
318            mcast.setLoopbackMode(loopBackMode);
319            mcast.setTimeToLive(getTimeToLive());
320            if (mcJoinNetworkInterface != null) {
321                mcast.joinGroup(sockAddress, NetworkInterface.getByName(mcJoinNetworkInterface));
322            }
323            else {
324                mcast.setNetworkInterface(findNetworkInterface());
325                mcast.joinGroup(inetAddress);
326            }
327            mcast.setSoTimeout((int)keepAliveInterval);
328            if (mcInterface != null) {
329                mcast.setInterface(InetAddress.getByName(mcInterface));
330            }
331            if (mcNetworkInterface != null) {
332                mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface));
333            }
334            runner = new Thread(this);
335            runner.setName(this.toString() + ":" + runner.getName());
336            runner.setDaemon(true);
337            runner.start();
338            doAdvertizeSelf();
339        }
340    }
341    
342    private NetworkInterface findNetworkInterface() throws SocketException {
343        Enumeration<NetworkInterface> ifcs = NetworkInterface.getNetworkInterfaces();
344        List<NetworkInterface> possibles = new ArrayList<NetworkInterface>();
345        while (ifcs.hasMoreElements()) {
346            NetworkInterface ni = ifcs.nextElement();
347            try {
348                if (ni.supportsMulticast()
349                        && ni.isUp()) {
350                    for (InterfaceAddress ia : ni.getInterfaceAddresses()) {
351                        if (ia != null && ia.getAddress() instanceof java.net.Inet4Address
352                                && !ia.getAddress().isLoopbackAddress()
353                                && (ni.getDisplayName()==null || !ni.getDisplayName().startsWith("vnic"))) {
354                            possibles.add(ni);
355                        }
356                    }
357                }
358            } catch (SocketException ignored) {}
359        }
360        return possibles.isEmpty() ? null : possibles.get(possibles.size() - 1);
361    }
362
363    /**
364     * stop the channel
365     * 
366     * @throws Exception
367     */
368    public void stop() throws Exception {
369        if (started.compareAndSet(true, false)) {
370            doAdvertizeSelf();
371            if (mcast != null) {
372                mcast.close();
373            }
374            if (runner != null) {
375                runner.interrupt();
376            }
377            if (executor != null) {
378                ThreadPoolUtils.shutdownNow(executor);
379                executor = null;
380            }
381        }
382    }
383
384    public String getType() {
385        return group + "." + TYPE_SUFFIX;
386    }
387
388    public void run() {
389        byte[] buf = new byte[BUFF_SIZE];
390        DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
391        while (started.get()) {
392            doTimeKeepingServices();
393            try {
394                mcast.receive(packet);
395                if (packet.getLength() > 0) {
396                    String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
397                    processData(str);
398                }
399            } catch (SocketTimeoutException se) {
400                // ignore
401            } catch (IOException e) {
402                if (started.get()) {
403                    LOG.error("failed to process packet: " + e);
404                }
405            }
406        }
407    }
408
409    private void processData(String str) {
410        if (discoveryListener != null) {
411            if (str.startsWith(getType())) {
412                String payload = str.substring(getType().length());
413                if (payload.startsWith(ALIVE)) {
414                    String brokerName = getBrokerName(payload.substring(ALIVE.length()));
415                    String service = payload.substring(ALIVE.length() + brokerName.length() + 2);
416                    processAlive(brokerName, service);
417                } else {
418                    String brokerName = getBrokerName(payload.substring(DEAD.length()));
419                    String service = payload.substring(DEAD.length() + brokerName.length() + 2);
420                    processDead(service);
421                }
422            }
423        }
424    }
425
426    private void doTimeKeepingServices() {
427        if (started.get()) {
428            long currentTime = System.currentTimeMillis();
429            if (currentTime < lastAdvertizeTime || ((currentTime - keepAliveInterval) > lastAdvertizeTime)) {
430                doAdvertizeSelf();
431                lastAdvertizeTime = currentTime;
432            }
433            doExpireOldServices();
434        }
435    }
436
437    private void doAdvertizeSelf() {
438        if (selfService != null) {
439            String payload = getType();
440            payload += started.get() ? ALIVE : DEAD;
441            payload += DELIMITER + "localhost" + DELIMITER;
442            payload += selfService;
443            try {
444                byte[] data = payload.getBytes();
445                DatagramPacket packet = new DatagramPacket(data, 0, data.length, sockAddress);
446                mcast.send(packet);
447            } catch (IOException e) {
448                // If a send fails, chances are all subsequent sends will fail
449                // too.. No need to keep reporting the
450                // same error over and over.
451                if (reportAdvertizeFailed) {
452                    reportAdvertizeFailed = false;
453                    LOG.error("Failed to advertise our service: " + payload, e);
454                    if ("Operation not permitted".equals(e.getMessage())) {
455                        LOG.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup.  "
456                                  + "Please make sure that the OS is properly configured to allow multicast traffic over: " + mcast.getLocalAddress());
457                    }
458                }
459            }
460        }
461    }
462
463    private void processAlive(String brokerName, String service) {
464        if (selfService == null || !service.equals(selfService)) {
465            RemoteBrokerData data = brokersByService.get(service);
466            if (data == null) {
467                data = new RemoteBrokerData(brokerName, service);
468                brokersByService.put(service, data);      
469                fireServiceAddEvent(data);
470                doAdvertizeSelf();
471            } else {
472                data.updateHeartBeat();
473                if (data.doRecovery()) {
474                    fireServiceAddEvent(data);
475                }
476            }
477        }
478    }
479
480    private void processDead(String service) {
481        if (!service.equals(selfService)) {
482            RemoteBrokerData data = brokersByService.remove(service);
483            if (data != null && !data.isFailed()) {
484                fireServiceRemovedEvent(data);
485            }
486        }
487    }
488
489    private void doExpireOldServices() {
490        long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 
491        for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
492            RemoteBrokerData data = i.next();
493            if (data.getLastHeartBeat() < expireTime) {
494                processDead(data.getServiceName());
495            }
496        }
497    }
498
499    private String getBrokerName(String str) {
500        String result = null;
501        int start = str.indexOf(DELIMITER);
502        if (start >= 0) {
503            int end = str.indexOf(DELIMITER, start + 1);
504            result = str.substring(start + 1, end);
505        }
506        return result;
507    }
508
509    public void serviceFailed(DiscoveryEvent event) throws IOException {
510        RemoteBrokerData data = brokersByService.get(event.getServiceName());
511        if (data != null && data.markFailed()) {
512            fireServiceRemovedEvent(data);
513        }
514    }
515
516    private void fireServiceRemovedEvent(final RemoteBrokerData data) {
517        if (discoveryListener != null && started.get()) {
518            // Have the listener process the event async so that
519            // he does not block this thread since we are doing time sensitive
520            // processing of events.
521            getExecutor().execute(new Runnable() {
522                public void run() {
523                    DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
524                    if (discoveryListener != null) {
525                        discoveryListener.onServiceRemove(data);
526                    }
527                }
528            });
529        }
530    }
531
532    private void fireServiceAddEvent(final RemoteBrokerData data) {
533        if (discoveryListener != null && started.get()) {
534
535            // Have the listener process the event async so that
536            // he does not block this thread since we are doing time sensitive
537            // processing of events.
538            getExecutor().execute(new Runnable() {
539                public void run() {
540                    DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
541                    if (discoveryListener != null) {
542                        discoveryListener.onServiceAdd(data);
543                    }
544                }
545            });
546        }
547    }
548
549    private ExecutorService getExecutor() {
550        if (executor == null) {
551            final String threadName = "Notifier-" + this.toString();
552            executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
553                public Thread newThread(Runnable runable) {
554                    Thread t = new Thread(runable,  threadName);
555                    t.setDaemon(true);
556                    return t;
557                }
558            });
559        }
560        return executor;
561    }
562
563    public long getBackOffMultiplier() {
564        return backOffMultiplier;
565    }
566
567    public void setBackOffMultiplier(long backOffMultiplier) {
568        this.backOffMultiplier = backOffMultiplier;
569    }
570
571    public long getInitialReconnectDelay() {
572        return initialReconnectDelay;
573    }
574
575    public void setInitialReconnectDelay(long initialReconnectDelay) {
576        this.initialReconnectDelay = initialReconnectDelay;
577    }
578
579    public int getMaxReconnectAttempts() {
580        return maxReconnectAttempts;
581    }
582
583    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
584        this.maxReconnectAttempts = maxReconnectAttempts;
585    }
586
587    public long getMaxReconnectDelay() {
588        return maxReconnectDelay;
589    }
590
591    public void setMaxReconnectDelay(long maxReconnectDelay) {
592        this.maxReconnectDelay = maxReconnectDelay;
593    }
594
595    public boolean isUseExponentialBackOff() {
596        return useExponentialBackOff;
597    }
598
599    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
600        this.useExponentialBackOff = useExponentialBackOff;
601    }
602
603    public void setGroup(String group) {
604        this.group = group;
605    }
606    
607    @Override
608    public String toString() {
609        return  "MulticastDiscoveryAgent-"
610            + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener);
611    }
612}