001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport.discovery.multicast;
019    
020    import java.io.IOException;
021    import java.net.DatagramPacket;
022    import java.net.InetAddress;
023    import java.net.InetSocketAddress;
024    import java.net.MulticastSocket;
025    import java.net.NetworkInterface;
026    import java.net.SocketAddress;
027    import java.net.SocketTimeoutException;
028    import java.net.URI;
029    import java.util.Iterator;
030    import java.util.Map;
031    import java.util.concurrent.ConcurrentHashMap;
032    import java.util.concurrent.ExecutorService;
033    import java.util.concurrent.LinkedBlockingQueue;
034    import java.util.concurrent.ThreadFactory;
035    import java.util.concurrent.ThreadPoolExecutor;
036    import java.util.concurrent.TimeUnit;
037    import java.util.concurrent.atomic.AtomicBoolean;
038    
039    import org.apache.activemq.command.DiscoveryEvent;
040    import org.apache.activemq.transport.discovery.DiscoveryAgent;
041    import org.apache.activemq.transport.discovery.DiscoveryListener;
042    import org.apache.activemq.util.ThreadPoolUtils;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    /**
047     * A {@link DiscoveryAgent} using a multicast address and heartbeat packets
048     * encoded using any wireformat, but openwire by default.
049     * 
050     * 
051     */
052    public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
053    
054        public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
055        public static final String DEFAULT_HOST_STR = "default"; 
056        public static final String DEFAULT_HOST_IP  = System.getProperty("activemq.partition.discovery", "239.255.2.3"); 
057        public static final int    DEFAULT_PORT  = 6155; 
058            
059        private static final Logger LOG = LoggerFactory.getLogger(MulticastDiscoveryAgent.class);
060        private static final String TYPE_SUFFIX = "ActiveMQ-4.";
061        private static final String ALIVE = "alive.";
062        private static final String DEAD = "dead.";
063        private static final String DELIMITER = "%";
064        private static final int BUFF_SIZE = 8192;
065        private static final int DEFAULT_IDLE_TIME = 500;
066        private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10;
067    
068        private long initialReconnectDelay = 1000 * 5;
069        private long maxReconnectDelay = 1000 * 30;
070        private long backOffMultiplier = 2;
071        private boolean useExponentialBackOff;
072        private int maxReconnectAttempts;
073    
074        private int timeToLive = 1;
075        private boolean loopBackMode;
076        private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>();
077        private String group = "default";
078        private URI discoveryURI;
079        private InetAddress inetAddress;
080        private SocketAddress sockAddress;
081        private DiscoveryListener discoveryListener;
082        private String selfService;
083        private MulticastSocket mcast;
084        private Thread runner;
085        private long keepAliveInterval = DEFAULT_IDLE_TIME;
086        private String mcInterface;
087        private String mcNetworkInterface;
088        private String mcJoinNetworkInterface;
089        private long lastAdvertizeTime;
090        private AtomicBoolean started = new AtomicBoolean(false);
091        private boolean reportAdvertizeFailed = true;
092        private ExecutorService executor = null;
093    
094        class RemoteBrokerData extends DiscoveryEvent {
095            long lastHeartBeat;
096            long recoveryTime;
097            int failureCount;
098            boolean failed;
099    
100            public RemoteBrokerData(String brokerName, String service) {
101                super(service);
102                setBrokerName(brokerName);
103                this.lastHeartBeat = System.currentTimeMillis();
104            }
105    
106            public synchronized void updateHeartBeat() {
107                lastHeartBeat = System.currentTimeMillis();
108    
109                // Consider that the broker recovery has succeeded if it has not
110                // failed in 60 seconds.
111                if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
112                    if (LOG.isDebugEnabled()) {
113                        LOG.debug("I now think that the " + serviceName + " service has recovered.");
114                    }
115                    failureCount = 0;
116                    recoveryTime = 0;
117                }
118            }
119    
120            public synchronized long getLastHeartBeat() {
121                return lastHeartBeat;
122            }
123    
124            public synchronized boolean markFailed() {
125                if (!failed) {
126                    failed = true;
127                    failureCount++;
128    
129                    long reconnectDelay;
130                    if (!useExponentialBackOff) {
131                        reconnectDelay = initialReconnectDelay;
132                    } else {
133                        reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount);
134                        if (reconnectDelay > maxReconnectDelay) {
135                            reconnectDelay = maxReconnectDelay;
136                        }
137                    }
138    
139                    if (LOG.isDebugEnabled()) {
140                        LOG.debug("Remote failure of " + serviceName + " while still receiving multicast advertisements.  Advertising events will be suppressed for " + reconnectDelay
141                                  + " ms, the current failure count is: " + failureCount);
142                    }
143    
144                    recoveryTime = System.currentTimeMillis() + reconnectDelay;
145                    return true;
146                }
147                return false;
148            }
149    
150            /**
151             * @return true if this broker is marked failed and it is now the right
152             *         time to start recovery.
153             */
154            public synchronized boolean doRecovery() {
155                if (!failed) {
156                    return false;
157                }
158    
159                // Are we done trying to recover this guy?
160                if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
161                    if (LOG.isDebugEnabled()) {
162                        LOG.debug("Max reconnect attempts of the " + serviceName + " service has been reached.");
163                    }
164                    return false;
165                }
166    
167                // Is it not yet time?
168                if (System.currentTimeMillis() < recoveryTime) {
169                    return false;
170                }
171    
172                if (LOG.isDebugEnabled()) {
173                    LOG.debug("Resuming event advertisement of the " + serviceName + " service.");
174                }
175                failed = false;
176                return true;
177            }
178    
179            public boolean isFailed() {
180                return failed;
181            }
182        }
183    
184        /**
185         * Set the discovery listener
186         * 
187         * @param listener
188         */
189        public void setDiscoveryListener(DiscoveryListener listener) {
190            this.discoveryListener = listener;
191        }
192    
193        /**
194         * register a service
195         */
196        public void registerService(String name) throws IOException {
197            this.selfService = name;
198            if (started.get()) {
199                doAdvertizeSelf();
200            }
201        }
202    
203        /**
204         * @return Returns the loopBackMode.
205         */
206        public boolean isLoopBackMode() {
207            return loopBackMode;
208        }
209    
210        /**
211         * @param loopBackMode The loopBackMode to set.
212         */
213        public void setLoopBackMode(boolean loopBackMode) {
214            this.loopBackMode = loopBackMode;
215        }
216    
217        /**
218         * @return Returns the timeToLive.
219         */
220        public int getTimeToLive() {
221            return timeToLive;
222        }
223    
224        /**
225         * @param timeToLive The timeToLive to set.
226         */
227        public void setTimeToLive(int timeToLive) {
228            this.timeToLive = timeToLive;
229        }
230    
231        /**
232         * @return the discoveryURI
233         */
234        public URI getDiscoveryURI() {
235            return discoveryURI;
236        }
237    
238        /**
239         * Set the discoveryURI
240         * 
241         * @param discoveryURI
242         */
243        public void setDiscoveryURI(URI discoveryURI) {
244            this.discoveryURI = discoveryURI;
245        }
246    
247        public long getKeepAliveInterval() {
248            return keepAliveInterval;
249        }
250    
251        public void setKeepAliveInterval(long keepAliveInterval) {
252            this.keepAliveInterval = keepAliveInterval;
253        }
254        
255        public void setInterface(String mcInterface) {
256            this.mcInterface = mcInterface;
257        }
258        
259        public void setNetworkInterface(String mcNetworkInterface) {
260            this.mcNetworkInterface = mcNetworkInterface;    
261        }
262        
263        public void setJoinNetworkInterface(String mcJoinNetwrokInterface) {
264            this.mcJoinNetworkInterface = mcJoinNetwrokInterface;
265        }
266        
267        /**
268         * start the discovery agent
269         * 
270         * @throws Exception
271         */
272        public void start() throws Exception {
273            
274            if (started.compareAndSet(false, true)) {               
275                                    
276                if (group == null || group.length() == 0) {
277                    throw new IOException("You must specify a group to discover");
278                }
279                String type = getType();
280                if (!type.endsWith(".")) {
281                    LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
282                    type += ".";
283                }
284                
285                if (discoveryURI == null) {
286                    discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING);
287                }
288                
289                if (LOG.isTraceEnabled()) 
290                            LOG.trace("start - discoveryURI = " + discoveryURI);                                      
291                      
292                      String myHost = discoveryURI.getHost();
293                      int    myPort = discoveryURI.getPort(); 
294                         
295                      if( DEFAULT_HOST_STR.equals(myHost) ) 
296                            myHost = DEFAULT_HOST_IP;                         
297                      
298                      if(myPort < 0 )
299                        myPort = DEFAULT_PORT;                  
300                      
301                      if (LOG.isTraceEnabled()) {
302                            LOG.trace("start - myHost = " + myHost); 
303                            LOG.trace("start - myPort = " + myPort);        
304                            LOG.trace("start - group  = " + group );                                
305                            LOG.trace("start - interface  = " + mcInterface );
306                            LOG.trace("start - network interface  = " + mcNetworkInterface );
307                            LOG.trace("start - join network interface  = " + mcJoinNetworkInterface );
308                      }     
309                      
310                this.inetAddress = InetAddress.getByName(myHost);
311                this.sockAddress = new InetSocketAddress(this.inetAddress, myPort);
312                mcast = new MulticastSocket(myPort);
313                mcast.setLoopbackMode(loopBackMode);
314                mcast.setTimeToLive(getTimeToLive());
315                if (mcJoinNetworkInterface != null) {
316                    mcast.joinGroup(sockAddress, NetworkInterface.getByName(mcJoinNetworkInterface));
317                }
318                else {
319                    mcast.joinGroup(inetAddress);
320                }
321                mcast.setSoTimeout((int)keepAliveInterval);
322                if (mcInterface != null) {
323                    mcast.setInterface(InetAddress.getByName(mcInterface));
324                }
325                if (mcNetworkInterface != null) {
326                    mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface));
327                }
328                runner = new Thread(this);
329                runner.setName(this.toString() + ":" + runner.getName());
330                runner.setDaemon(true);
331                runner.start();
332                doAdvertizeSelf();
333            }
334        }
335    
336        /**
337         * stop the channel
338         * 
339         * @throws Exception
340         */
341        public void stop() throws Exception {
342            if (started.compareAndSet(true, false)) {
343                doAdvertizeSelf();
344                if (mcast != null) {
345                    mcast.close();
346                }
347                if (runner != null) {
348                    runner.interrupt();
349                }
350                if (executor != null) {
351                    ThreadPoolUtils.shutdownNow(executor);
352                    executor = null;
353                }
354            }
355        }
356    
357        public String getType() {
358            return group + "." + TYPE_SUFFIX;
359        }
360    
361        public void run() {
362            byte[] buf = new byte[BUFF_SIZE];
363            DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
364            while (started.get()) {
365                doTimeKeepingServices();
366                try {
367                    mcast.receive(packet);
368                    if (packet.getLength() > 0) {
369                        String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
370                        processData(str);
371                    }
372                } catch (SocketTimeoutException se) {
373                    // ignore
374                } catch (IOException e) {
375                    if (started.get()) {
376                        LOG.error("failed to process packet: " + e);
377                    }
378                }
379            }
380        }
381    
382        private void processData(String str) {
383            if (discoveryListener != null) {
384                if (str.startsWith(getType())) {
385                    String payload = str.substring(getType().length());
386                    if (payload.startsWith(ALIVE)) {
387                        String brokerName = getBrokerName(payload.substring(ALIVE.length()));
388                        String service = payload.substring(ALIVE.length() + brokerName.length() + 2);
389                        processAlive(brokerName, service);
390                    } else {
391                        String brokerName = getBrokerName(payload.substring(DEAD.length()));
392                        String service = payload.substring(DEAD.length() + brokerName.length() + 2);
393                        processDead(service);
394                    }
395                }
396            }
397        }
398    
399        private void doTimeKeepingServices() {
400            if (started.get()) {
401                long currentTime = System.currentTimeMillis();
402                if (currentTime < lastAdvertizeTime || ((currentTime - keepAliveInterval) > lastAdvertizeTime)) {
403                    doAdvertizeSelf();
404                    lastAdvertizeTime = currentTime;
405                }
406                doExpireOldServices();
407            }
408        }
409    
410        private void doAdvertizeSelf() {
411            if (selfService != null) {
412                String payload = getType();
413                payload += started.get() ? ALIVE : DEAD;
414                payload += DELIMITER + "localhost" + DELIMITER;
415                payload += selfService;
416                try {
417                    byte[] data = payload.getBytes();
418                    DatagramPacket packet = new DatagramPacket(data, 0, data.length, sockAddress);
419                    mcast.send(packet);
420                } catch (IOException e) {
421                    // If a send fails, chances are all subsequent sends will fail
422                    // too.. No need to keep reporting the
423                    // same error over and over.
424                    if (reportAdvertizeFailed) {
425                        reportAdvertizeFailed = false;
426                        LOG.error("Failed to advertise our service: " + payload, e);
427                        if ("Operation not permitted".equals(e.getMessage())) {
428                            LOG.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup.  "
429                                      + "Please make sure that the OS is properly configured to allow multicast traffic over: " + mcast.getLocalAddress());
430                        }
431                    }
432                }
433            }
434        }
435    
436        private void processAlive(String brokerName, String service) {
437            if (selfService == null || !service.equals(selfService)) {
438                RemoteBrokerData data = brokersByService.get(service);
439                if (data == null) {
440                    data = new RemoteBrokerData(brokerName, service);
441                    brokersByService.put(service, data);      
442                    fireServiceAddEvent(data);
443                    doAdvertizeSelf();
444                } else {
445                    data.updateHeartBeat();
446                    if (data.doRecovery()) {
447                        fireServiceAddEvent(data);
448                    }
449                }
450            }
451        }
452    
453        private void processDead(String service) {
454            if (!service.equals(selfService)) {
455                RemoteBrokerData data = brokersByService.remove(service);
456                if (data != null && !data.isFailed()) {
457                    fireServiceRemovedEvent(data);
458                }
459            }
460        }
461    
462        private void doExpireOldServices() {
463            long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 
464            for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
465                RemoteBrokerData data = i.next();
466                if (data.getLastHeartBeat() < expireTime) {
467                    processDead(data.getServiceName());
468                }
469            }
470        }
471    
472        private String getBrokerName(String str) {
473            String result = null;
474            int start = str.indexOf(DELIMITER);
475            if (start >= 0) {
476                int end = str.indexOf(DELIMITER, start + 1);
477                result = str.substring(start + 1, end);
478            }
479            return result;
480        }
481    
482        public void serviceFailed(DiscoveryEvent event) throws IOException {
483            RemoteBrokerData data = brokersByService.get(event.getServiceName());
484            if (data != null && data.markFailed()) {
485                fireServiceRemovedEvent(data);
486            }
487        }
488    
489        private void fireServiceRemovedEvent(final RemoteBrokerData data) {
490            if (discoveryListener != null && started.get()) {
491                // Have the listener process the event async so that
492                // he does not block this thread since we are doing time sensitive
493                // processing of events.
494                getExecutor().execute(new Runnable() {
495                    public void run() {
496                        DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
497                        if (discoveryListener != null) {
498                            discoveryListener.onServiceRemove(data);
499                        }
500                    }
501                });
502            }
503        }
504    
505        private void fireServiceAddEvent(final RemoteBrokerData data) {
506            if (discoveryListener != null && started.get()) {
507    
508                // Have the listener process the event async so that
509                // he does not block this thread since we are doing time sensitive
510                // processing of events.
511                getExecutor().execute(new Runnable() {
512                    public void run() {
513                        DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
514                        if (discoveryListener != null) {
515                            discoveryListener.onServiceAdd(data);
516                        }
517                    }
518                });
519            }
520        }
521    
522        private ExecutorService getExecutor() {
523            if (executor == null) {
524                final String threadName = "Notifier-" + this.toString();
525                executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
526                    public Thread newThread(Runnable runable) {
527                        Thread t = new Thread(runable,  threadName);
528                        t.setDaemon(true);
529                        return t;
530                    }
531                });
532            }
533            return executor;
534        }
535    
536        public long getBackOffMultiplier() {
537            return backOffMultiplier;
538        }
539    
540        public void setBackOffMultiplier(long backOffMultiplier) {
541            this.backOffMultiplier = backOffMultiplier;
542        }
543    
544        public long getInitialReconnectDelay() {
545            return initialReconnectDelay;
546        }
547    
548        public void setInitialReconnectDelay(long initialReconnectDelay) {
549            this.initialReconnectDelay = initialReconnectDelay;
550        }
551    
552        public int getMaxReconnectAttempts() {
553            return maxReconnectAttempts;
554        }
555    
556        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
557            this.maxReconnectAttempts = maxReconnectAttempts;
558        }
559    
560        public long getMaxReconnectDelay() {
561            return maxReconnectDelay;
562        }
563    
564        public void setMaxReconnectDelay(long maxReconnectDelay) {
565            this.maxReconnectDelay = maxReconnectDelay;
566        }
567    
568        public boolean isUseExponentialBackOff() {
569            return useExponentialBackOff;
570        }
571    
572        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
573            this.useExponentialBackOff = useExponentialBackOff;
574        }
575    
576        public void setGroup(String group) {
577            this.group = group;
578        }
579        
580        @Override
581        public String toString() {
582            return  "MulticastDiscoveryAgent-"
583                + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener);
584        }
585    }