001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport;
018
019 import java.io.IOException;
020 import java.util.Timer;
021 import java.util.concurrent.SynchronousQueue;
022 import java.util.concurrent.ThreadFactory;
023 import java.util.concurrent.ThreadPoolExecutor;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.atomic.AtomicBoolean;
026 import java.util.concurrent.atomic.AtomicInteger;
027 import java.util.concurrent.locks.ReentrantReadWriteLock;
028
029 import org.apache.activemq.command.KeepAliveInfo;
030 import org.apache.activemq.command.WireFormatInfo;
031 import org.apache.activemq.thread.SchedulerTimerTask;
032 import org.apache.activemq.util.ThreadPoolUtils;
033 import org.apache.activemq.wireformat.WireFormat;
034 import org.slf4j.Logger;
035 import org.slf4j.LoggerFactory;
036
037 /**
038 * Used to make sure that commands are arriving periodically from the peer of
039 * the transport.
040 */
041 public abstract class AbstractInactivityMonitor extends TransportFilter {
042
043 private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class);
044
045 private static ThreadPoolExecutor ASYNC_TASKS;
046 private static int CHECKER_COUNTER;
047 private static long DEFAULT_CHECK_TIME_MILLS = 30000;
048 private static Timer READ_CHECK_TIMER;
049 private static Timer WRITE_CHECK_TIMER;
050
051 private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
052
053 private final AtomicBoolean commandSent = new AtomicBoolean(false);
054 private final AtomicBoolean inSend = new AtomicBoolean(false);
055 private final AtomicBoolean failed = new AtomicBoolean(false);
056
057 private final AtomicBoolean commandReceived = new AtomicBoolean(true);
058 private final AtomicBoolean inReceive = new AtomicBoolean(false);
059 private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
060
061 private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
062
063 private SchedulerTimerTask writeCheckerTask;
064 private SchedulerTimerTask readCheckerTask;
065
066 private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
067 private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
068 private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
069 private boolean useKeepAlive = true;
070 private boolean keepAliveResponseRequired;
071
072 protected WireFormat wireFormat;
073
074 private final Runnable readChecker = new Runnable() {
075 long lastRunTime;
076 public void run() {
077 long now = System.currentTimeMillis();
078 long elapsed = (now-lastRunTime);
079
080 if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
081 LOG.debug(""+elapsed+" ms elapsed since last read check.");
082 }
083
084 // Perhaps the timer executed a read check late.. and then executes
085 // the next read check on time which causes the time elapsed between
086 // read checks to be small..
087
088 // If less than 90% of the read check Time elapsed then abort this readcheck.
089 if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
090 LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
091 return;
092 }
093
094 lastRunTime = now;
095 readCheck();
096 }
097
098 @Override
099 public String toString() {
100 return "ReadChecker";
101 }
102 };
103
104 private boolean allowReadCheck(long elapsed) {
105 return elapsed > (readCheckTime * 9 / 10);
106 }
107
108 private final Runnable writeChecker = new Runnable() {
109 long lastRunTime;
110 public void run() {
111 long now = System.currentTimeMillis();
112 if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
113 LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
114
115 }
116 lastRunTime = now;
117 writeCheck();
118 }
119
120 @Override
121 public String toString() {
122 return "WriteChecker";
123 }
124 };
125
126 public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) {
127 super(next);
128 this.wireFormat = wireFormat;
129 }
130
131 public void start() throws Exception {
132 next.start();
133 startMonitorThreads();
134 }
135
136 public void stop() throws Exception {
137 stopMonitorThreads();
138 next.stop();
139 }
140
141 final void writeCheck() {
142 if (inSend.get()) {
143 if (LOG.isTraceEnabled()) {
144 LOG.trace("A send is in progress");
145 }
146 return;
147 }
148
149 if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
150 if (LOG.isTraceEnabled()) {
151 LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
152 }
153 ASYNC_TASKS.execute(new Runnable() {
154 public void run() {
155 if (LOG.isDebugEnabled()) {
156 LOG.debug("Running {}", this);
157 }
158 if (monitorStarted.get()) {
159 try {
160 // If we can't get the lock it means another write beat us into the
161 // send and we don't need to heart beat now.
162 if (sendLock.writeLock().tryLock()) {
163 KeepAliveInfo info = new KeepAliveInfo();
164 info.setResponseRequired(keepAliveResponseRequired);
165 doOnewaySend(info);
166 }
167 } catch (IOException e) {
168 onException(e);
169 } finally {
170 if (sendLock.writeLock().isHeldByCurrentThread()) {
171 sendLock.writeLock().unlock();
172 }
173 }
174 }
175 }
176
177 @Override
178 public String toString() {
179 return "WriteCheck[" + getRemoteAddress() + "]";
180 };
181 });
182 } else {
183 if (LOG.isTraceEnabled()) {
184 LOG.trace(this + " message sent since last write check, resetting flag");
185 }
186 }
187
188 commandSent.set(false);
189 }
190
191 final void readCheck() {
192 int currentCounter = next.getReceiveCounter();
193 int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
194 if (inReceive.get() || currentCounter!=previousCounter ) {
195 if (LOG.isTraceEnabled()) {
196 LOG.trace("A receive is in progress");
197 }
198 return;
199 }
200 if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
201 if (LOG.isDebugEnabled()) {
202 LOG.debug("No message received since last read check for " + toString() + ". Throwing InactivityIOException.");
203 }
204 ASYNC_TASKS.execute(new Runnable() {
205 public void run() {
206 if (LOG.isDebugEnabled()) {
207 LOG.debug("Running {}", this);
208 }
209 onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
210 }
211
212 @Override
213 public String toString() {
214 return "ReadCheck[" + getRemoteAddress() + "]";
215 };
216 });
217 } else {
218 if (LOG.isTraceEnabled()) {
219 LOG.trace("Message received since last read check, resetting flag: ");
220 }
221 }
222 commandReceived.set(false);
223 }
224
225 protected abstract void processInboundWireFormatInfo(WireFormatInfo info) throws IOException;
226 protected abstract void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException;
227
228 public void onCommand(Object command) {
229 commandReceived.set(true);
230 inReceive.set(true);
231 try {
232 if (command.getClass() == KeepAliveInfo.class) {
233 KeepAliveInfo info = (KeepAliveInfo) command;
234 if (info.isResponseRequired()) {
235 sendLock.readLock().lock();
236 try {
237 info.setResponseRequired(false);
238 oneway(info);
239 } catch (IOException e) {
240 onException(e);
241 } finally {
242 sendLock.readLock().unlock();
243 }
244 }
245 } else {
246 if (command.getClass() == WireFormatInfo.class) {
247 synchronized (this) {
248 try {
249 processInboundWireFormatInfo((WireFormatInfo) command);
250 } catch (IOException e) {
251 onException(e);
252 }
253 }
254 }
255
256 transportListener.onCommand(command);
257 }
258 } finally {
259 inReceive.set(false);
260 }
261 }
262
263 public void oneway(Object o) throws IOException {
264 // To prevent the inactivity monitor from sending a message while we
265 // are performing a send we take a read lock. The inactivity monitor
266 // sends its Heart-beat commands under a write lock. This means that
267 // the MutexTransport is still responsible for synchronizing sends
268 this.sendLock.readLock().lock();
269 inSend.set(true);
270 try {
271 doOnewaySend(o);
272 } finally {
273 commandSent.set(true);
274 inSend.set(false);
275 this.sendLock.readLock().unlock();
276 }
277 }
278
279 // Must be called under lock, either read or write on sendLock.
280 private void doOnewaySend(Object command) throws IOException {
281 if( failed.get() ) {
282 throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
283 }
284 if (command.getClass() == WireFormatInfo.class) {
285 synchronized (this) {
286 processOutboundWireFormatInfo((WireFormatInfo) command);
287 }
288 }
289 next.oneway(command);
290 }
291
292 public void onException(IOException error) {
293 if (failed.compareAndSet(false, true)) {
294 stopMonitorThreads();
295 transportListener.onException(error);
296 }
297 }
298
299 public void setUseKeepAlive(boolean val) {
300 useKeepAlive = val;
301 }
302
303 public long getReadCheckTime() {
304 return readCheckTime;
305 }
306
307 public void setReadCheckTime(long readCheckTime) {
308 this.readCheckTime = readCheckTime;
309 }
310
311 public long getWriteCheckTime() {
312 return writeCheckTime;
313 }
314
315 public void setWriteCheckTime(long writeCheckTime) {
316 this.writeCheckTime = writeCheckTime;
317 }
318
319 public long getInitialDelayTime() {
320 return initialDelayTime;
321 }
322
323 public void setInitialDelayTime(long initialDelayTime) {
324 this.initialDelayTime = initialDelayTime;
325 }
326
327 public boolean isKeepAliveResponseRequired() {
328 return this.keepAliveResponseRequired;
329 }
330
331 public void setKeepAliveResponseRequired(boolean value) {
332 this.keepAliveResponseRequired = value;
333 }
334
335 public boolean isMonitorStarted() {
336 return this.monitorStarted.get();
337 }
338
339 protected synchronized void startMonitorThreads() throws IOException {
340 if (monitorStarted.get()) {
341 return;
342 }
343
344 if (!configuredOk()) {
345 return;
346 }
347
348 if (readCheckTime > 0) {
349 readCheckerTask = new SchedulerTimerTask(readChecker);
350 }
351
352 if (writeCheckTime > 0) {
353 writeCheckerTask = new SchedulerTimerTask(writeChecker);
354 }
355
356 if (writeCheckTime > 0 || readCheckTime > 0) {
357 monitorStarted.set(true);
358 synchronized(AbstractInactivityMonitor.class) {
359 if( CHECKER_COUNTER == 0 ) {
360 ASYNC_TASKS = createExecutor();
361 READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer",true);
362 WRITE_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor WriteCheckTimer",true);
363 }
364 CHECKER_COUNTER++;
365 if (readCheckTime > 0) {
366 READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
367 }
368 if (writeCheckTime > 0) {
369 WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
370 }
371 }
372 }
373 }
374
375 abstract protected boolean configuredOk() throws IOException;
376
377 protected synchronized void stopMonitorThreads() {
378 if (monitorStarted.compareAndSet(true, false)) {
379 if (readCheckerTask != null) {
380 readCheckerTask.cancel();
381 }
382 if (writeCheckerTask != null) {
383 writeCheckerTask.cancel();
384 }
385 synchronized( AbstractInactivityMonitor.class ) {
386 WRITE_CHECK_TIMER.purge();
387 READ_CHECK_TIMER.purge();
388 CHECKER_COUNTER--;
389 if(CHECKER_COUNTER==0) {
390 WRITE_CHECK_TIMER.cancel();
391 READ_CHECK_TIMER.cancel();
392 WRITE_CHECK_TIMER = null;
393 READ_CHECK_TIMER = null;
394 ThreadPoolUtils.shutdown(ASYNC_TASKS);
395 ASYNC_TASKS = null;
396 }
397 }
398 }
399 }
400
401 private ThreadFactory factory = new ThreadFactory() {
402 public Thread newThread(Runnable runnable) {
403 Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker");
404 thread.setDaemon(true);
405 return thread;
406 }
407 };
408
409 private ThreadPoolExecutor createExecutor() {
410 // TODO: This value of 10 seconds seems to low, see discussion at
411 // http://activemq.2283324.n4.nabble.com/InactivityMonitor-Creating-too-frequent-threads-tp4656752.html;cid=1348142445209-351
412 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
413 exec.allowCoreThreadTimeOut(true);
414 return exec;
415 }
416 }