001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.discovery.simple;
018
019 import java.io.IOException;
020 import java.net.URI;
021 import java.util.concurrent.atomic.AtomicBoolean;
022
023 import org.apache.activemq.command.DiscoveryEvent;
024 import org.apache.activemq.thread.TaskRunnerFactory;
025 import org.apache.activemq.transport.discovery.DiscoveryAgent;
026 import org.apache.activemq.transport.discovery.DiscoveryListener;
027 import org.slf4j.Logger;
028 import org.slf4j.LoggerFactory;
029
030 /**
031 * A simple DiscoveryAgent that allows static configuration of the discovered
032 * services.
033 *
034 *
035 */
036 public class SimpleDiscoveryAgent implements DiscoveryAgent {
037
038 private final static Logger LOG = LoggerFactory.getLogger(SimpleDiscoveryAgent.class);
039 private long initialReconnectDelay = 1000;
040 private long maxReconnectDelay = 1000 * 30;
041 private long backOffMultiplier = 2;
042 private boolean useExponentialBackOff=true;
043 private int maxReconnectAttempts;
044 private final Object sleepMutex = new Object();
045 private long minConnectTime = 5000;
046 private DiscoveryListener listener;
047 private String services[] = new String[] {};
048 private final AtomicBoolean running = new AtomicBoolean(false);
049 private TaskRunnerFactory taskRunner;
050
051 class SimpleDiscoveryEvent extends DiscoveryEvent {
052
053 private int connectFailures;
054 private long reconnectDelay = initialReconnectDelay;
055 private long connectTime = System.currentTimeMillis();
056 private AtomicBoolean failed = new AtomicBoolean(false);
057
058 public SimpleDiscoveryEvent(String service) {
059 super(service);
060 }
061
062 @Override
063 public String toString() {
064 return "[" + serviceName + ", failed:" + failed + ", connectionFailures:" + connectFailures + "]";
065 }
066 }
067
068 public void setDiscoveryListener(DiscoveryListener listener) {
069 this.listener = listener;
070 }
071
072 public void registerService(String name) throws IOException {
073 }
074
075 public void start() throws Exception {
076 taskRunner = new TaskRunnerFactory();
077 taskRunner.init();
078
079 running.set(true);
080 for (int i = 0; i < services.length; i++) {
081 listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
082 }
083 }
084
085 public void stop() throws Exception {
086 running.set(false);
087
088 taskRunner.shutdown();
089
090 // TODO: Should we not remove the services on the listener?
091
092 synchronized (sleepMutex) {
093 sleepMutex.notifyAll();
094 }
095 }
096
097 public String[] getServices() {
098 return services;
099 }
100
101 public void setServices(String services) {
102 this.services = services.split(",");
103 }
104
105 public void setServices(String services[]) {
106 this.services = services;
107 }
108
109 public void setServices(URI services[]) {
110 this.services = new String[services.length];
111 for (int i = 0; i < services.length; i++) {
112 this.services[i] = services[i].toString();
113 }
114 }
115
116 public void serviceFailed(DiscoveryEvent devent) throws IOException {
117
118 final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
119 if (event.failed.compareAndSet(false, true)) {
120
121 listener.onServiceRemove(event);
122 taskRunner.execute(new Runnable() {
123 public void run() {
124
125 // We detect a failed connection attempt because the service
126 // fails right
127 // away.
128 if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
129 LOG.debug("Failure occurred soon after the discovery event was generated. It will be classified as a connection failure: "+event);
130
131 event.connectFailures++;
132
133 if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
134 LOG.warn("Reconnect attempts exceeded "+maxReconnectAttempts+" tries. Reconnecting has been disabled for: " + event);
135 return;
136 }
137
138 synchronized (sleepMutex) {
139 try {
140 if (!running.get()) {
141 LOG.debug("Reconnecting disabled: stopped");
142 return;
143 }
144
145 LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect.");
146 sleepMutex.wait(event.reconnectDelay);
147 } catch (InterruptedException ie) {
148 LOG.debug("Reconnecting disabled: " + ie);
149 Thread.currentThread().interrupt();
150 return;
151 }
152 }
153
154 if (!useExponentialBackOff) {
155 event.reconnectDelay = initialReconnectDelay;
156 } else {
157 // Exponential increment of reconnect delay.
158 event.reconnectDelay *= backOffMultiplier;
159 if (event.reconnectDelay > maxReconnectDelay) {
160 event.reconnectDelay = maxReconnectDelay;
161 }
162 }
163
164 } else {
165 event.connectFailures = 0;
166 event.reconnectDelay = initialReconnectDelay;
167 }
168
169 if (!running.get()) {
170 LOG.debug("Reconnecting disabled: stopped");
171 return;
172 }
173
174 event.connectTime = System.currentTimeMillis();
175 event.failed.set(false);
176 listener.onServiceAdd(event);
177 }
178 }, "Simple Discovery Agent");
179 }
180 }
181
182 public long getBackOffMultiplier() {
183 return backOffMultiplier;
184 }
185
186 public void setBackOffMultiplier(long backOffMultiplier) {
187 this.backOffMultiplier = backOffMultiplier;
188 }
189
190 public long getInitialReconnectDelay() {
191 return initialReconnectDelay;
192 }
193
194 public void setInitialReconnectDelay(long initialReconnectDelay) {
195 this.initialReconnectDelay = initialReconnectDelay;
196 }
197
198 public int getMaxReconnectAttempts() {
199 return maxReconnectAttempts;
200 }
201
202 public void setMaxReconnectAttempts(int maxReconnectAttempts) {
203 this.maxReconnectAttempts = maxReconnectAttempts;
204 }
205
206 public long getMaxReconnectDelay() {
207 return maxReconnectDelay;
208 }
209
210 public void setMaxReconnectDelay(long maxReconnectDelay) {
211 this.maxReconnectDelay = maxReconnectDelay;
212 }
213
214 public long getMinConnectTime() {
215 return minConnectTime;
216 }
217
218 public void setMinConnectTime(long minConnectTime) {
219 this.minConnectTime = minConnectTime;
220 }
221
222 public boolean isUseExponentialBackOff() {
223 return useExponentialBackOff;
224 }
225
226 public void setUseExponentialBackOff(boolean useExponentialBackOff) {
227 this.useExponentialBackOff = useExponentialBackOff;
228 }
229 }