001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport.discovery.simple;
018    
019    import java.io.IOException;
020    import java.net.URI;
021    import java.util.concurrent.atomic.AtomicBoolean;
022    
023    import org.apache.activemq.command.DiscoveryEvent;
024    import org.apache.activemq.thread.TaskRunnerFactory;
025    import org.apache.activemq.transport.discovery.DiscoveryAgent;
026    import org.apache.activemq.transport.discovery.DiscoveryListener;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    /**
031     * A simple DiscoveryAgent that allows static configuration of the discovered
032     * services.
033     * 
034     * 
035     */
036    public class SimpleDiscoveryAgent implements DiscoveryAgent {
037    
038        private final static Logger LOG = LoggerFactory.getLogger(SimpleDiscoveryAgent.class);
039        private long initialReconnectDelay = 1000;
040        private long maxReconnectDelay = 1000 * 30;
041        private long backOffMultiplier = 2;
042        private boolean useExponentialBackOff=true;
043        private int maxReconnectAttempts;
044        private final Object sleepMutex = new Object();
045        private long minConnectTime = 5000;
046        private DiscoveryListener listener;
047        private String services[] = new String[] {};
048        private final AtomicBoolean running = new AtomicBoolean(false);
049        private TaskRunnerFactory taskRunner;
050    
051        class SimpleDiscoveryEvent extends DiscoveryEvent {
052    
053            private int connectFailures;
054            private long reconnectDelay = initialReconnectDelay;
055            private long connectTime = System.currentTimeMillis();
056            private AtomicBoolean failed = new AtomicBoolean(false);
057    
058            public SimpleDiscoveryEvent(String service) {
059                super(service);
060            }
061    
062            @Override
063            public String toString() {
064                return "[" + serviceName + ", failed:" + failed + ", connectionFailures:" + connectFailures + "]";
065            }
066        }
067    
068        public void setDiscoveryListener(DiscoveryListener listener) {
069            this.listener = listener;
070        }
071    
072        public void registerService(String name) throws IOException {
073        }
074    
075        public void start() throws Exception {
076            taskRunner = new TaskRunnerFactory();
077            taskRunner.init();
078    
079            running.set(true);
080            for (int i = 0; i < services.length; i++) {
081                listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
082            }
083        }
084    
085        public void stop() throws Exception {
086            running.set(false);
087    
088            taskRunner.shutdown();
089    
090            // TODO: Should we not remove the services on the listener?
091    
092            synchronized (sleepMutex) {
093                sleepMutex.notifyAll();
094            }
095        }
096    
097        public String[] getServices() {
098            return services;
099        }
100    
101        public void setServices(String services) {
102            this.services = services.split(",");
103        }
104    
105        public void setServices(String services[]) {
106            this.services = services;
107        }
108    
109        public void setServices(URI services[]) {
110            this.services = new String[services.length];
111            for (int i = 0; i < services.length; i++) {
112                this.services[i] = services[i].toString();
113            }
114        }
115    
116        public void serviceFailed(DiscoveryEvent devent) throws IOException {
117    
118            final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
119            if (event.failed.compareAndSet(false, true)) {
120    
121                listener.onServiceRemove(event);
122                taskRunner.execute(new Runnable() {
123                    public void run() {
124    
125                        // We detect a failed connection attempt because the service
126                        // fails right
127                        // away.
128                        if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
129                            LOG.debug("Failure occurred soon after the discovery event was generated.  It will be classified as a connection failure: "+event);
130    
131                            event.connectFailures++;
132    
133                            if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
134                                LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+" tries.  Reconnecting has been disabled for: " + event);
135                                return;
136                            }
137    
138                            synchronized (sleepMutex) {
139                                try {
140                                    if (!running.get()) {
141                                        LOG.debug("Reconnecting disabled: stopped");
142                                        return;
143                                    }
144    
145                                    LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect.");
146                                    sleepMutex.wait(event.reconnectDelay);
147                                } catch (InterruptedException ie) {
148                                    LOG.debug("Reconnecting disabled: " + ie);
149                                    Thread.currentThread().interrupt();
150                                    return;
151                                }
152                            }
153    
154                            if (!useExponentialBackOff) {
155                                event.reconnectDelay = initialReconnectDelay;
156                            } else {
157                                // Exponential increment of reconnect delay.
158                                event.reconnectDelay *= backOffMultiplier;
159                                if (event.reconnectDelay > maxReconnectDelay) {
160                                    event.reconnectDelay = maxReconnectDelay;
161                                }
162                            }
163    
164                        } else {
165                            event.connectFailures = 0;
166                            event.reconnectDelay = initialReconnectDelay;
167                        }
168    
169                        if (!running.get()) {
170                            LOG.debug("Reconnecting disabled: stopped");
171                            return;
172                        }
173    
174                        event.connectTime = System.currentTimeMillis();
175                        event.failed.set(false);
176                        listener.onServiceAdd(event);
177                    }
178                }, "Simple Discovery Agent");
179            }
180        }
181    
182        public long getBackOffMultiplier() {
183            return backOffMultiplier;
184        }
185    
186        public void setBackOffMultiplier(long backOffMultiplier) {
187            this.backOffMultiplier = backOffMultiplier;
188        }
189    
190        public long getInitialReconnectDelay() {
191            return initialReconnectDelay;
192        }
193    
194        public void setInitialReconnectDelay(long initialReconnectDelay) {
195            this.initialReconnectDelay = initialReconnectDelay;
196        }
197    
198        public int getMaxReconnectAttempts() {
199            return maxReconnectAttempts;
200        }
201    
202        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
203            this.maxReconnectAttempts = maxReconnectAttempts;
204        }
205    
206        public long getMaxReconnectDelay() {
207            return maxReconnectDelay;
208        }
209    
210        public void setMaxReconnectDelay(long maxReconnectDelay) {
211            this.maxReconnectDelay = maxReconnectDelay;
212        }
213    
214        public long getMinConnectTime() {
215            return minConnectTime;
216        }
217    
218        public void setMinConnectTime(long minConnectTime) {
219            this.minConnectTime = minConnectTime;
220        }
221    
222        public boolean isUseExponentialBackOff() {
223            return useExponentialBackOff;
224        }
225    
226        public void setUseExponentialBackOff(boolean useExponentialBackOff) {
227            this.useExponentialBackOff = useExponentialBackOff;
228        }
229    }