001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.discovery.simple;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.util.concurrent.atomic.AtomicBoolean;
022    
023    import org.apache.activemq.command.DiscoveryEvent;
024    import org.apache.activemq.thread.TaskRunnerFactory;
025    import org.apache.activemq.transport.discovery.DiscoveryAgent;
026    import org.apache.activemq.transport.discovery.DiscoveryListener;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    /**
031     * A simple DiscoveryAgent that allows static configuration of the discovered
032     * services.
033     * 
034     * 
035     */
036    public class SimpleDiscoveryAgent implements DiscoveryAgent {
037    
038        private final static Logger LOG = LoggerFactory.getLogger(SimpleDiscoveryAgent.class);
039        private long initialReconnectDelay = 1000;
040        private long maxReconnectDelay = 1000 * 30;
041        private long backOffMultiplier = 2;
042        private boolean useExponentialBackOff=true;
043        private int maxReconnectAttempts;
044        private final Object sleepMutex = new Object();
045        private long minConnectTime = 5000;
046        private DiscoveryListener listener;
047        private String services[] = new String[] {};
048        private final AtomicBoolean running = new AtomicBoolean(false);
049        private TaskRunnerFactory taskRunner;
050    
051        class SimpleDiscoveryEvent extends DiscoveryEvent {
052    
053            private int connectFailures;
054            private long reconnectDelay = initialReconnectDelay;
055            private long connectTime = System.currentTimeMillis();
056            private AtomicBoolean failed = new AtomicBoolean(false);
057    
058            public SimpleDiscoveryEvent(String service) {
059                super(service);
060            }
061    
062                    public SimpleDiscoveryEvent(SimpleDiscoveryEvent copy) {
063                            super(copy);
064                            connectFailures = copy.connectFailures;
065                            reconnectDelay = copy.reconnectDelay;
066                            connectTime = copy.connectTime;
067                            failed.set(copy.failed.get());
068                    }
069            
070            @Override
071            public String toString() {
072                return "[" + serviceName + ", failed:" + failed + ", connectionFailures:" + connectFailures + "]";
073            }
074        }
075    
076        public void setDiscoveryListener(DiscoveryListener listener) {
077            this.listener = listener;
078        }
079    
080        public void registerService(String name) throws IOException {
081        }
082    
083        public void start() throws Exception {
084            taskRunner = new TaskRunnerFactory();
085            taskRunner.init();
086    
087            running.set(true);
088            for (int i = 0; i < services.length; i++) {
089                listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
090            }
091        }
092    
093        public void stop() throws Exception {
094            running.set(false);
095    
096            taskRunner.shutdown();
097    
098            // TODO: Should we not remove the services on the listener?
099    
100            synchronized (sleepMutex) {
101                sleepMutex.notifyAll();
102            }
103        }
104    
105        public String[] getServices() {
106            return services;
107        }
108    
109        public void setServices(String services) {
110            this.services = services.split(",");
111        }
112    
113        public void setServices(String services[]) {
114            this.services = services;
115        }
116    
117        public void setServices(URI services[]) {
118            this.services = new String[services.length];
119            for (int i = 0; i < services.length; i++) {
120                this.services[i] = services[i].toString();
121            }
122        }
123    
124        public void serviceFailed(DiscoveryEvent devent) throws IOException {
125    
126            final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
127            if (sevent.failed.compareAndSet(false, true)) {
128    
129                listener.onServiceRemove(sevent);
130                taskRunner.execute(new Runnable() {
131                    public void run() {
132                        SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);
133                            
134                        // We detect a failed connection attempt because the service
135                        // fails right away.
136                        if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
137                            LOG.debug("Failure occurred soon after the discovery event was generated.  It will be classified as a connection failure: "+event);
138    
139                            event.connectFailures++;
140    
141                            if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
142                                LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+" tries.  Reconnecting has been disabled for: " + event);
143                                return;
144                            }
145    
146                            synchronized (sleepMutex) {
147                                try {
148                                    if (!running.get()) {
149                                        LOG.debug("Reconnecting disabled: stopped");
150                                        return;
151                                    }
152    
153                                    LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect.");
154                                    sleepMutex.wait(event.reconnectDelay);
155                                } catch (InterruptedException ie) {
156                                    LOG.debug("Reconnecting disabled: " + ie);
157                                    Thread.currentThread().interrupt();
158                                    return;
159                                }
160                            }
161    
162                            if (!useExponentialBackOff) {
163                                event.reconnectDelay = initialReconnectDelay;
164                            } else {
165                                // Exponential increment of reconnect delay.
166                                event.reconnectDelay *= backOffMultiplier;
167                                if (event.reconnectDelay > maxReconnectDelay) {
168                                    event.reconnectDelay = maxReconnectDelay;
169                                }
170                            }
171    
172                        } else {
173                            event.connectFailures = 0;
174                            event.reconnectDelay = initialReconnectDelay;
175                        }
176    
177                        if (!running.get()) {
178                            LOG.debug("Reconnecting disabled: stopped");
179                            return;
180                        }
181    
182                        event.connectTime = System.currentTimeMillis();
183                        event.failed.set(false);
184                        listener.onServiceAdd(event);
185                    }
186                }, "Simple Discovery Agent");
187            }
188        }
189    
190        public long getBackOffMultiplier() {
191            return backOffMultiplier;
192        }
193    
194        public void setBackOffMultiplier(long backOffMultiplier) {
195            this.backOffMultiplier = backOffMultiplier;
196        }
197    
198        public long getInitialReconnectDelay() {
199            return initialReconnectDelay;
200        }
201    
202        public void setInitialReconnectDelay(long initialReconnectDelay) {
203            this.initialReconnectDelay = initialReconnectDelay;
204        }
205    
206        public int getMaxReconnectAttempts() {
207            return maxReconnectAttempts;
208        }
209    
210        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
211            this.maxReconnectAttempts = maxReconnectAttempts;
212        }
213    
214        public long getMaxReconnectDelay() {
215            return maxReconnectDelay;
216        }
217    
218        public void setMaxReconnectDelay(long maxReconnectDelay) {
219            this.maxReconnectDelay = maxReconnectDelay;
220        }
221    
222        public long getMinConnectTime() {
223            return minConnectTime;
224        }
225    
226        public void setMinConnectTime(long minConnectTime) {
227            this.minConnectTime = minConnectTime;
228        }
229    
230        public boolean isUseExponentialBackOff() {
231            return useExponentialBackOff;
232        }
233    
234        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
235            this.useExponentialBackOff = useExponentialBackOff;
236        }
237    }