001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport.discovery.multicast;
019    
020    import java.io.IOException;
021    import java.net.DatagramPacket;
022    import java.net.InetAddress;
023    import java.net.InetSocketAddress;
024    import java.net.MulticastSocket;
025    import java.net.NetworkInterface;
026    import java.net.SocketAddress;
027    import java.net.SocketTimeoutException;
028    import java.net.URI;
029    import java.util.Iterator;
030    import java.util.Map;
031    import java.util.concurrent.ConcurrentHashMap;
032    import java.util.concurrent.ExecutorService;
033    import java.util.concurrent.LinkedBlockingQueue;
034    import java.util.concurrent.ThreadFactory;
035    import java.util.concurrent.ThreadPoolExecutor;
036    import java.util.concurrent.TimeUnit;
037    import java.util.concurrent.atomic.AtomicBoolean;
038    
039    import org.apache.activemq.command.DiscoveryEvent;
040    import org.apache.activemq.transport.discovery.DiscoveryAgent;
041    import org.apache.activemq.transport.discovery.DiscoveryListener;
042    import org.apache.activemq.util.ThreadPoolUtils;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    /**
047     * A {@link DiscoveryAgent} using a multicast address and heartbeat packets
048     * encoded using any wireformat, but openwire by default.
049     * 
050     * 
051     */
052    public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
053    
054        public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
055        public static final String DEFAULT_HOST_STR = "default"; 
056        public static final String DEFAULT_HOST_IP  = System.getProperty("activemq.partition.discovery", "239.255.2.3"); 
057        public static final int    DEFAULT_PORT  = 6155; 
058            
059        private static final Logger LOG = LoggerFactory.getLogger(MulticastDiscoveryAgent.class);
060        private static final String TYPE_SUFFIX = "ActiveMQ-4.";
061        private static final String ALIVE = "alive.";
062        private static final String DEAD = "dead.";
063        private static final String DELIMITER = "%";
064        private static final int BUFF_SIZE = 8192;
065        private static final int DEFAULT_IDLE_TIME = 500;
066        private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10;
067    
068        private long initialReconnectDelay = 1000 * 5;
069        private long maxReconnectDelay = 1000 * 30;
070        private long backOffMultiplier = 2;
071        private boolean useExponentialBackOff;
072        private int maxReconnectAttempts;
073    
074        private int timeToLive = 1;
075        private boolean loopBackMode;
076        private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>();
077        private String group = "default";
078        private URI discoveryURI;
079        private InetAddress inetAddress;
080        private SocketAddress sockAddress;
081        private DiscoveryListener discoveryListener;
082        private String selfService;
083        private MulticastSocket mcast;
084        private Thread runner;
085        private long keepAliveInterval = DEFAULT_IDLE_TIME;
086        private String mcInterface;
087        private String mcNetworkInterface;
088        private String mcJoinNetworkInterface;
089        private long lastAdvertizeTime;
090        private AtomicBoolean started = new AtomicBoolean(false);
091        private boolean reportAdvertizeFailed = true;
092        private ExecutorService executor = null;
093    
094        class RemoteBrokerData {
095            final String brokerName;
096            final String service;
097            long lastHeartBeat;
098            long recoveryTime;
099            int failureCount;
100            boolean failed;
101    
102            public RemoteBrokerData(String brokerName, String service) {
103                this.brokerName = brokerName;
104                this.service = service;
105                this.lastHeartBeat = System.currentTimeMillis();
106            }
107    
108            public synchronized void updateHeartBeat() {
109                lastHeartBeat = System.currentTimeMillis();
110    
111                // Consider that the broker recovery has succeeded if it has not
112                // failed in 60 seconds.
113                if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
114                    if (LOG.isDebugEnabled()) {
115                        LOG.debug("I now think that the " + service + " service has recovered.");
116                    }
117                    failureCount = 0;
118                    recoveryTime = 0;
119                }
120            }
121    
122            public synchronized long getLastHeartBeat() {
123                return lastHeartBeat;
124            }
125    
126            public synchronized boolean markFailed() {
127                if (!failed) {
128                    failed = true;
129                    failureCount++;
130    
131                    long reconnectDelay;
132                    if (!useExponentialBackOff) {
133                        reconnectDelay = initialReconnectDelay;
134                    } else {
135                        reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount);
136                        if (reconnectDelay > maxReconnectDelay) {
137                            reconnectDelay = maxReconnectDelay;
138                        }
139                    }
140    
141                    if (LOG.isDebugEnabled()) {
142                        LOG.debug("Remote failure of " + service + " while still receiving multicast advertisements.  Advertising events will be suppressed for " + reconnectDelay
143                                  + " ms, the current failure count is: " + failureCount);
144                    }
145    
146                    recoveryTime = System.currentTimeMillis() + reconnectDelay;
147                    return true;
148                }
149                return false;
150            }
151    
152            /**
153             * @return true if this broker is marked failed and it is now the right
154             *         time to start recovery.
155             */
156            public synchronized boolean doRecovery() {
157                if (!failed) {
158                    return false;
159                }
160    
161                // Are we done trying to recover this guy?
162                if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
163                    if (LOG.isDebugEnabled()) {
164                        LOG.debug("Max reconnect attempts of the " + service + " service has been reached.");
165                    }
166                    return false;
167                }
168    
169                // Is it not yet time?
170                if (System.currentTimeMillis() < recoveryTime) {
171                    return false;
172                }
173    
174                if (LOG.isDebugEnabled()) {
175                    LOG.debug("Resuming event advertisement of the " + service + " service.");
176                }
177                failed = false;
178                return true;
179            }
180    
181            public boolean isFailed() {
182                return failed;
183            }
184        }
185    
186        /**
187         * Set the discovery listener
188         * 
189         * @param listener
190         */
191        public void setDiscoveryListener(DiscoveryListener listener) {
192            this.discoveryListener = listener;
193        }
194    
195        /**
196         * register a service
197         */
198        public void registerService(String name) throws IOException {
199            this.selfService = name;
200            if (started.get()) {
201                doAdvertizeSelf();
202            }
203        }
204    
205        /**
206         * @return Returns the loopBackMode.
207         */
208        public boolean isLoopBackMode() {
209            return loopBackMode;
210        }
211    
212        /**
213         * @param loopBackMode The loopBackMode to set.
214         */
215        public void setLoopBackMode(boolean loopBackMode) {
216            this.loopBackMode = loopBackMode;
217        }
218    
219        /**
220         * @return Returns the timeToLive.
221         */
222        public int getTimeToLive() {
223            return timeToLive;
224        }
225    
226        /**
227         * @param timeToLive The timeToLive to set.
228         */
229        public void setTimeToLive(int timeToLive) {
230            this.timeToLive = timeToLive;
231        }
232    
233        /**
234         * @return the discoveryURI
235         */
236        public URI getDiscoveryURI() {
237            return discoveryURI;
238        }
239    
240        /**
241         * Set the discoveryURI
242         * 
243         * @param discoveryURI
244         */
245        public void setDiscoveryURI(URI discoveryURI) {
246            this.discoveryURI = discoveryURI;
247        }
248    
249        public long getKeepAliveInterval() {
250            return keepAliveInterval;
251        }
252    
253        public void setKeepAliveInterval(long keepAliveInterval) {
254            this.keepAliveInterval = keepAliveInterval;
255        }
256        
257        public void setInterface(String mcInterface) {
258            this.mcInterface = mcInterface;
259        }
260        
261        public void setNetworkInterface(String mcNetworkInterface) {
262            this.mcNetworkInterface = mcNetworkInterface;    
263        }
264        
265        public void setJoinNetworkInterface(String mcJoinNetwrokInterface) {
266            this.mcJoinNetworkInterface = mcJoinNetwrokInterface;
267        }
268        
269        /**
270         * start the discovery agent
271         * 
272         * @throws Exception
273         */
274        public void start() throws Exception {
275            
276            if (started.compareAndSet(false, true)) {               
277                                    
278                if (group == null || group.length() == 0) {
279                    throw new IOException("You must specify a group to discover");
280                }
281                String type = getType();
282                if (!type.endsWith(".")) {
283                    LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
284                    type += ".";
285                }
286                
287                if (discoveryURI == null) {
288                    discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING);
289                }
290                
291                if (LOG.isTraceEnabled()) 
292                            LOG.trace("start - discoveryURI = " + discoveryURI);                                      
293                      
294                      String myHost = discoveryURI.getHost();
295                      int    myPort = discoveryURI.getPort(); 
296                         
297                      if( DEFAULT_HOST_STR.equals(myHost) ) 
298                            myHost = DEFAULT_HOST_IP;                         
299                      
300                      if(myPort < 0 )
301                        myPort = DEFAULT_PORT;                  
302                      
303                      if (LOG.isTraceEnabled()) {
304                            LOG.trace("start - myHost = " + myHost); 
305                            LOG.trace("start - myPort = " + myPort);        
306                            LOG.trace("start - group  = " + group );                                
307                            LOG.trace("start - interface  = " + mcInterface );
308                            LOG.trace("start - network interface  = " + mcNetworkInterface );
309                            LOG.trace("start - join network interface  = " + mcJoinNetworkInterface );
310                      }     
311                      
312                this.inetAddress = InetAddress.getByName(myHost);
313                this.sockAddress = new InetSocketAddress(this.inetAddress, myPort);
314                mcast = new MulticastSocket(myPort);
315                mcast.setLoopbackMode(loopBackMode);
316                mcast.setTimeToLive(getTimeToLive());
317                if (mcJoinNetworkInterface != null) {
318                    mcast.joinGroup(sockAddress, NetworkInterface.getByName(mcJoinNetworkInterface));
319                }
320                else {
321                    mcast.joinGroup(inetAddress);
322                }
323                mcast.setSoTimeout((int)keepAliveInterval);
324                if (mcInterface != null) {
325                    mcast.setInterface(InetAddress.getByName(mcInterface));
326                }
327                if (mcNetworkInterface != null) {
328                    mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface));
329                }
330                runner = new Thread(this);
331                runner.setName(this.toString() + ":" + runner.getName());
332                runner.setDaemon(true);
333                runner.start();
334                doAdvertizeSelf();
335            }
336        }
337    
338        /**
339         * stop the channel
340         * 
341         * @throws Exception
342         */
343        public void stop() throws Exception {
344            if (started.compareAndSet(true, false)) {
345                doAdvertizeSelf();
346                if (mcast != null) {
347                    mcast.close();
348                }
349                if (runner != null) {
350                    runner.interrupt();
351                }
352                if (executor != null) {
353                    ThreadPoolUtils.shutdownNow(executor);
354                    executor = null;
355                }
356            }
357        }
358    
359        public String getType() {
360            return group + "." + TYPE_SUFFIX;
361        }
362    
363        public void run() {
364            byte[] buf = new byte[BUFF_SIZE];
365            DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
366            while (started.get()) {
367                doTimeKeepingServices();
368                try {
369                    mcast.receive(packet);
370                    if (packet.getLength() > 0) {
371                        String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
372                        processData(str);
373                    }
374                } catch (SocketTimeoutException se) {
375                    // ignore
376                } catch (IOException e) {
377                    if (started.get()) {
378                        LOG.error("failed to process packet: " + e);
379                    }
380                }
381            }
382        }
383    
384        private void processData(String str) {
385            if (discoveryListener != null) {
386                if (str.startsWith(getType())) {
387                    String payload = str.substring(getType().length());
388                    if (payload.startsWith(ALIVE)) {
389                        String brokerName = getBrokerName(payload.substring(ALIVE.length()));
390                        String service = payload.substring(ALIVE.length() + brokerName.length() + 2);
391                        processAlive(brokerName, service);
392                    } else {
393                        String brokerName = getBrokerName(payload.substring(DEAD.length()));
394                        String service = payload.substring(DEAD.length() + brokerName.length() + 2);
395                        processDead(service);
396                    }
397                }
398            }
399        }
400    
401        private void doTimeKeepingServices() {
402            if (started.get()) {
403                long currentTime = System.currentTimeMillis();
404                if (currentTime < lastAdvertizeTime || ((currentTime - keepAliveInterval) > lastAdvertizeTime)) {
405                    doAdvertizeSelf();
406                    lastAdvertizeTime = currentTime;
407                }
408                doExpireOldServices();
409            }
410        }
411    
412        private void doAdvertizeSelf() {
413            if (selfService != null) {
414                String payload = getType();
415                payload += started.get() ? ALIVE : DEAD;
416                payload += DELIMITER + "localhost" + DELIMITER;
417                payload += selfService;
418                try {
419                    byte[] data = payload.getBytes();
420                    DatagramPacket packet = new DatagramPacket(data, 0, data.length, sockAddress);
421                    mcast.send(packet);
422                } catch (IOException e) {
423                    // If a send fails, chances are all subsequent sends will fail
424                    // too.. No need to keep reporting the
425                    // same error over and over.
426                    if (reportAdvertizeFailed) {
427                        reportAdvertizeFailed = false;
428                        LOG.error("Failed to advertise our service: " + payload, e);
429                        if ("Operation not permitted".equals(e.getMessage())) {
430                            LOG.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup.  "
431                                      + "Please make sure that the OS is properly configured to allow multicast traffic over: " + mcast.getLocalAddress());
432                        }
433                    }
434                }
435            }
436        }
437    
438        private void processAlive(String brokerName, String service) {
439            if (selfService == null || !service.equals(selfService)) {
440                RemoteBrokerData data = brokersByService.get(service);
441                if (data == null) {
442                    data = new RemoteBrokerData(brokerName, service);
443                    brokersByService.put(service, data);      
444                    fireServiceAddEvent(data);
445                    doAdvertizeSelf();
446                } else {
447                    data.updateHeartBeat();
448                    if (data.doRecovery()) {
449                        fireServiceAddEvent(data);
450                    }
451                }
452            }
453        }
454    
455        private void processDead(String service) {
456            if (!service.equals(selfService)) {
457                RemoteBrokerData data = brokersByService.remove(service);
458                if (data != null && !data.isFailed()) {
459                    fireServiceRemovedEvent(data);
460                }
461            }
462        }
463    
464        private void doExpireOldServices() {
465            long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 
466            for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
467                RemoteBrokerData data = i.next();
468                if (data.getLastHeartBeat() < expireTime) {
469                    processDead(data.service);
470                }
471            }
472        }
473    
474        private String getBrokerName(String str) {
475            String result = null;
476            int start = str.indexOf(DELIMITER);
477            if (start >= 0) {
478                int end = str.indexOf(DELIMITER, start + 1);
479                result = str.substring(start + 1, end);
480            }
481            return result;
482        }
483    
484        public void serviceFailed(DiscoveryEvent event) throws IOException {
485            RemoteBrokerData data = brokersByService.get(event.getServiceName());
486            if (data != null && data.markFailed()) {
487                fireServiceRemovedEvent(data);
488            }
489        }
490    
491        private void fireServiceRemovedEvent(RemoteBrokerData data) {
492            if (discoveryListener != null && started.get()) {
493                final DiscoveryEvent event = new DiscoveryEvent(data.service);
494                event.setBrokerName(data.brokerName);
495    
496                // Have the listener process the event async so that
497                // he does not block this thread since we are doing time sensitive
498                // processing of events.
499                getExecutor().execute(new Runnable() {
500                    public void run() {
501                        DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
502                        if (discoveryListener != null) {
503                            discoveryListener.onServiceRemove(event);
504                        }
505                    }
506                });
507            }
508        }
509    
510        private void fireServiceAddEvent(RemoteBrokerData data) {
511            if (discoveryListener != null && started.get()) {
512                final DiscoveryEvent event = new DiscoveryEvent(data.service);
513                event.setBrokerName(data.brokerName);
514                
515                // Have the listener process the event async so that
516                // he does not block this thread since we are doing time sensitive
517                // processing of events.
518                getExecutor().execute(new Runnable() {
519                    public void run() {
520                        DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
521                        if (discoveryListener != null) {
522                            discoveryListener.onServiceAdd(event);
523                        }
524                    }
525                });
526            }
527        }
528    
529        private ExecutorService getExecutor() {
530            if (executor == null) {
531                final String threadName = "Notifier-" + this.toString();
532                executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
533                    public Thread newThread(Runnable runable) {
534                        Thread t = new Thread(runable,  threadName);
535                        t.setDaemon(true);
536                        return t;
537                    }
538                });
539            }
540            return executor;
541        }
542    
543        public long getBackOffMultiplier() {
544            return backOffMultiplier;
545        }
546    
547        public void setBackOffMultiplier(long backOffMultiplier) {
548            this.backOffMultiplier = backOffMultiplier;
549        }
550    
551        public long getInitialReconnectDelay() {
552            return initialReconnectDelay;
553        }
554    
555        public void setInitialReconnectDelay(long initialReconnectDelay) {
556            this.initialReconnectDelay = initialReconnectDelay;
557        }
558    
559        public int getMaxReconnectAttempts() {
560            return maxReconnectAttempts;
561        }
562    
563        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
564            this.maxReconnectAttempts = maxReconnectAttempts;
565        }
566    
567        public long getMaxReconnectDelay() {
568            return maxReconnectDelay;
569        }
570    
571        public void setMaxReconnectDelay(long maxReconnectDelay) {
572            this.maxReconnectDelay = maxReconnectDelay;
573        }
574    
575        public boolean isUseExponentialBackOff() {
576            return useExponentialBackOff;
577        }
578    
579        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
580            this.useExponentialBackOff = useExponentialBackOff;
581        }
582    
583        public void setGroup(String group) {
584            this.group = group;
585        }
586        
587        @Override
588        public String toString() {
589            return  "MulticastDiscoveryAgent-"
590                + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener);
591        }
592    }