001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.discovery.http;
018    
019    import java.io.IOException;
020    import java.util.HashMap;
021    import java.util.HashSet;
022    import java.util.Map;
023    import java.util.Scanner;
024    import java.util.Set;
025    import java.util.concurrent.atomic.AtomicBoolean;
026    import java.util.concurrent.atomic.AtomicInteger;
027    import java.util.concurrent.atomic.AtomicReference;
028    
029    import org.apache.activemq.Service;
030    import org.apache.activemq.command.DiscoveryEvent;
031    import org.apache.activemq.transport.discovery.DiscoveryAgent;
032    import org.apache.activemq.transport.discovery.DiscoveryListener;
033    import org.apache.activemq.util.IntrospectionSupport;
034    import org.apache.http.client.HttpClient;
035    import org.apache.http.client.ResponseHandler;
036    import org.apache.http.client.methods.HttpDelete;
037    import org.apache.http.client.methods.HttpGet;
038    import org.apache.http.client.methods.HttpPut;
039    import org.apache.http.impl.client.BasicResponseHandler;
040    import org.apache.http.impl.client.DefaultHttpClient;
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    public class HTTPDiscoveryAgent implements DiscoveryAgent {
045    
046        private static final Logger LOG = LoggerFactory.getLogger(HTTPDiscoveryAgent.class);
047    
048        private String registryURL = "http://localhost:8080/discovery-registry/default";
049        private HttpClient httpClient = new DefaultHttpClient();
050        private AtomicBoolean running = new AtomicBoolean();
051        private final AtomicReference<DiscoveryListener> discoveryListener = new AtomicReference<DiscoveryListener>();
052        private final HashSet<String> registeredServices = new HashSet<String>();
053        private final HashMap<String, SimpleDiscoveryEvent> discoveredServices = new HashMap<String, SimpleDiscoveryEvent>();
054        private Thread thread;
055        private long updateInterval = 1000 * 10;
056        @SuppressWarnings("unused")
057        private String brokerName;
058        private boolean startEmbeddRegistry = false;
059        private Service jetty;
060        private AtomicInteger startCounter = new AtomicInteger(0);
061    
062        private long initialReconnectDelay = 1000;
063        private long maxReconnectDelay = 1000 * 30;
064        private long backOffMultiplier = 2;
065        private boolean useExponentialBackOff = true;
066        private int maxReconnectAttempts;
067        private final Object sleepMutex = new Object();
068        private long minConnectTime = 5000;
069    
070        class SimpleDiscoveryEvent extends DiscoveryEvent {
071    
072            private int connectFailures;
073            private long reconnectDelay = initialReconnectDelay;
074            private long connectTime = System.currentTimeMillis();
075            private AtomicBoolean failed = new AtomicBoolean(false);
076            private AtomicBoolean removed = new AtomicBoolean(false);
077    
078            public SimpleDiscoveryEvent(String service) {
079                super(service);
080            }
081        }
082    
083        public String getGroup() {
084            return null;
085        }
086    
087        public void registerService(String service) throws IOException {
088            synchronized (registeredServices) {
089                registeredServices.add(service);
090            }
091            doRegister(service);
092        }
093    
094        synchronized private void doRegister(String service) {
095            String url = registryURL;
096            try {
097                HttpPut method = new HttpPut(url);
098                method.addHeader("service", service);
099                ResponseHandler<String> handler = new BasicResponseHandler();
100                String responseBody = httpClient.execute(method, handler);
101                LOG.debug("PUT to " + url + " got a " + responseBody);
102            } catch (Exception e) {
103                LOG.debug("PUT to " + url + " failed with: " + e);
104            }
105        }
106    
107        @SuppressWarnings("unused")
108        synchronized private void doUnRegister(String service) {
109            String url = registryURL;
110            try {
111                HttpDelete method = new HttpDelete(url);
112                method.addHeader("service", service);
113                ResponseHandler<String> handler = new BasicResponseHandler();
114                String responseBody = httpClient.execute(method, handler);
115                LOG.debug("DELETE to " + url + " got a " + responseBody);
116            } catch (Exception e) {
117                LOG.debug("DELETE to " + url + " failed with: " + e);
118            }
119        }
120    
121        synchronized private Set<String> doLookup(long freshness) {
122            String url = registryURL + "?freshness=" + freshness;
123            try {
124                HttpGet method = new HttpGet(url);
125                ResponseHandler<String> handler = new BasicResponseHandler();
126                String response = httpClient.execute(method, handler);
127                LOG.debug("GET to " + url + " got a " + response);
128                Set<String> rc = new HashSet<String>();
129                Scanner scanner = new Scanner(response);
130                while (scanner.hasNextLine()) {
131                    String service = scanner.nextLine();
132                    if (service.trim().length() != 0) {
133                        rc.add(service);
134                    }
135                }
136                return rc;
137            } catch (Exception e) {
138                LOG.debug("GET to " + url + " failed with: " + e);
139                return null;
140            }
141        }
142    
143        public void serviceFailed(DiscoveryEvent devent) throws IOException {
144    
145            final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent) devent;
146            if (event.failed.compareAndSet(false, true)) {
147                discoveryListener.get().onServiceRemove(event);
148                if (!event.removed.get()) {
149                    // Setup a thread to re-raise the event...
150                    Thread thread = new Thread() {
151                        public void run() {
152    
153                            // We detect a failed connection attempt because the
154                            // service
155                            // fails right away.
156                            if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
157                                LOG.debug("Failure occured soon after the discovery event was generated.  " +
158                                          "It will be clasified as a connection failure: " + event);
159    
160                                event.connectFailures++;
161    
162                                if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
163                                    LOG.debug("Reconnect attempts exceeded " + maxReconnectAttempts +
164                                              " tries.  Reconnecting has been disabled.");
165                                    return;
166                                }
167    
168                                synchronized (sleepMutex) {
169                                    try {
170                                        if (!running.get() || event.removed.get()) {
171                                            return;
172                                        }
173                                        LOG.debug("Waiting " + event.reconnectDelay +
174                                                  " ms before attepting to reconnect.");
175                                        sleepMutex.wait(event.reconnectDelay);
176                                    } catch (InterruptedException ie) {
177                                        Thread.currentThread().interrupt();
178                                        return;
179                                    }
180                                }
181    
182                                if (!useExponentialBackOff) {
183                                    event.reconnectDelay = initialReconnectDelay;
184                                } else {
185                                    // Exponential increment of reconnect delay.
186                                    event.reconnectDelay *= backOffMultiplier;
187                                    if (event.reconnectDelay > maxReconnectDelay) {
188                                        event.reconnectDelay = maxReconnectDelay;
189                                    }
190                                }
191    
192                            } else {
193                                event.connectFailures = 0;
194                                event.reconnectDelay = initialReconnectDelay;
195                            }
196    
197                            if (!running.get() || event.removed.get()) {
198                                return;
199                            }
200    
201                            event.connectTime = System.currentTimeMillis();
202                            event.failed.set(false);
203                            discoveryListener.get().onServiceAdd(event);
204                        }
205                    };
206                    thread.setDaemon(true);
207                    thread.start();
208                }
209            }
210        }
211    
212        public void setBrokerName(String brokerName) {
213            this.brokerName = brokerName;
214        }
215    
216        public void setDiscoveryListener(DiscoveryListener discoveryListener) {
217            this.discoveryListener.set(discoveryListener);
218        }
219    
220        public void setGroup(String group) {
221        }
222    
223        public void start() throws Exception {
224            if (startCounter.addAndGet(1) == 1) {
225                if (startEmbeddRegistry) {
226                    jetty = createEmbeddedJettyServer();
227                    Map<String, Object> props = new HashMap<String, Object>();
228                    props.put("agent", this);
229                    IntrospectionSupport.setProperties(jetty, props);
230                    jetty.start();
231                }
232    
233                running.set(true);
234                thread = new Thread("HTTPDiscovery Agent") {
235                    @Override
236                    public void run() {
237                        while (running.get()) {
238                            try {
239                                update();
240                                Thread.sleep(updateInterval);
241                            } catch (InterruptedException e) {
242                                return;
243                            }
244                        }
245                    }
246                };
247                thread.setDaemon(true);
248                thread.start();
249            }
250        }
251    
252        /**
253         * Create the EmbeddedJettyServer instance via reflection so that we can
254         * avoid a hard runtime dependency on jetty.
255         *
256         * @return
257         * @throws Exception
258         */
259        private Service createEmbeddedJettyServer() throws Exception {
260            Class<?> clazz = HTTPDiscoveryAgent.class.getClassLoader().loadClass("org.apache.activemq.transport.discovery.http.EmbeddedJettyServer");
261            return (Service) clazz.newInstance();
262        }
263    
264        private void update() {
265            // Register all our services...
266            synchronized (registeredServices) {
267                for (String service : registeredServices) {
268                    doRegister(service);
269                }
270            }
271    
272            // Find new registered services...
273            DiscoveryListener discoveryListener = this.discoveryListener.get();
274            if (discoveryListener != null) {
275                Set<String> activeServices = doLookup(updateInterval * 3);
276                // If there is error talking the the central server, then
277                // activeServices == null
278                if (activeServices != null) {
279                    synchronized (discoveredServices) {
280    
281                        HashSet<String> removedServices = new HashSet<String>(discoveredServices.keySet());
282                        removedServices.removeAll(activeServices);
283    
284                        HashSet<String> addedServices = new HashSet<String>(activeServices);
285                        addedServices.removeAll(discoveredServices.keySet());
286                        addedServices.removeAll(removedServices);
287    
288                        for (String service : addedServices) {
289                            SimpleDiscoveryEvent e = new SimpleDiscoveryEvent(service);
290                            discoveredServices.put(service, e);
291                            discoveryListener.onServiceAdd(e);
292                        }
293    
294                        for (String service : removedServices) {
295                            SimpleDiscoveryEvent e = discoveredServices.remove(service);
296                            if (e != null) {
297                                e.removed.set(true);
298                            }
299                            discoveryListener.onServiceRemove(e);
300                        }
301                    }
302                }
303            }
304        }
305    
306        public void stop() throws Exception {
307            if (startCounter.decrementAndGet() == 0) {
308                running.set(false);
309                if (thread != null) {
310                    thread.join(updateInterval * 3);
311                    thread = null;
312                }
313                if (jetty != null) {
314                    jetty.stop();
315                    jetty = null;
316                }
317            }
318        }
319    
320        public String getRegistryURL() {
321            return registryURL;
322        }
323    
324        public void setRegistryURL(String discoveryRegistryURL) {
325            this.registryURL = discoveryRegistryURL;
326        }
327    
328        public long getUpdateInterval() {
329            return updateInterval;
330        }
331    
332        public void setUpdateInterval(long updateInterval) {
333            this.updateInterval = updateInterval;
334        }
335    
336        public boolean isStartEmbeddRegistry() {
337            return startEmbeddRegistry;
338        }
339    
340        public void setStartEmbeddRegistry(boolean startEmbeddRegistry) {
341            this.startEmbeddRegistry = startEmbeddRegistry;
342        }
343    }