001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport;
018
019import java.io.IOException;
020import java.util.Timer;
021import java.util.concurrent.RejectedExecutionException;
022import java.util.concurrent.SynchronousQueue;
023import java.util.concurrent.ThreadFactory;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.concurrent.locks.ReentrantReadWriteLock;
029
030import org.apache.activemq.command.KeepAliveInfo;
031import org.apache.activemq.command.WireFormatInfo;
032import org.apache.activemq.thread.SchedulerTimerTask;
033import org.apache.activemq.wireformat.WireFormat;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * Used to make sure that commands are arriving periodically from the peer of
039 * the transport.
040 */
041public abstract class AbstractInactivityMonitor extends TransportFilter {
042
043    private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class);
044
045    private static final long DEFAULT_CHECK_TIME_MILLS = 30000;
046
047    private static ThreadPoolExecutor ASYNC_TASKS;
048    private static int CHECKER_COUNTER;
049    private static Timer READ_CHECK_TIMER;
050    private static Timer WRITE_CHECK_TIMER;
051
052    private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
053
054    private final AtomicBoolean commandSent = new AtomicBoolean(false);
055    private final AtomicBoolean inSend = new AtomicBoolean(false);
056    private final AtomicBoolean failed = new AtomicBoolean(false);
057
058    private final AtomicBoolean commandReceived = new AtomicBoolean(true);
059    private final AtomicBoolean inReceive = new AtomicBoolean(false);
060    private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
061
062    private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
063
064    private SchedulerTimerTask connectCheckerTask;
065    private SchedulerTimerTask writeCheckerTask;
066    private SchedulerTimerTask readCheckerTask;
067
068    private long connectAttemptTimeout = DEFAULT_CHECK_TIME_MILLS;
069    private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
070    private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
071    private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
072    private boolean useKeepAlive = true;
073    private boolean keepAliveResponseRequired;
074
075    protected WireFormat wireFormat;
076
077    private final Runnable connectChecker = new Runnable() {
078
079        private final long startTime = System.currentTimeMillis();
080
081        @Override
082        public void run() {
083            long now = System.currentTimeMillis();
084
085            if ((now - startTime) >= connectAttemptTimeout && connectCheckerTask != null && !ASYNC_TASKS.isShutdown()) {
086                LOG.debug("No connection attempt made in time for {}! Throwing InactivityIOException.", AbstractInactivityMonitor.this.toString());
087                try {
088                    ASYNC_TASKS.execute(new Runnable() {
089                        @Override
090                        public void run() {
091                            onException(new InactivityIOException(
092                                "Channel was inactive (no connection attempt made) for too (>" + (connectAttemptTimeout) + ") long: " + next.getRemoteAddress()));
093                        }
094                    });
095                } catch (RejectedExecutionException ex) {
096                    if (!ASYNC_TASKS.isShutdown()) {
097                        LOG.error("Async connection timeout task was rejected from the executor: ", ex);
098                        throw ex;
099                    }
100                }
101            }
102        }
103    };
104
105    private final Runnable readChecker = new Runnable() {
106        long lastRunTime;
107
108        @Override
109        public void run() {
110            long now = System.currentTimeMillis();
111            long elapsed = (now - lastRunTime);
112
113            if (lastRunTime != 0) {
114                LOG.debug("{}ms elapsed since last read check.", elapsed);
115            }
116
117            // Perhaps the timer executed a read check late.. and then executes
118            // the next read check on time which causes the time elapsed between
119            // read checks to be small..
120
121            // If less than 90% of the read check Time elapsed then abort this
122            // read check.
123            if (!allowReadCheck(elapsed)) {
124                LOG.debug("Aborting read check...Not enough time elapsed since last read check.");
125                return;
126            }
127
128            lastRunTime = now;
129            readCheck();
130        }
131
132        @Override
133        public String toString() {
134            return "ReadChecker";
135        }
136    };
137
138    private boolean allowReadCheck(long elapsed) {
139        return elapsed > (readCheckTime * 9 / 10);
140    }
141
142    private final Runnable writeChecker = new Runnable() {
143        long lastRunTime;
144
145        @Override
146        public void run() {
147            long now = System.currentTimeMillis();
148            if (lastRunTime != 0) {
149                LOG.debug("{}: {}ms elapsed since last write check.", this, (now - lastRunTime));
150            }
151            lastRunTime = now;
152            writeCheck();
153        }
154
155        @Override
156        public String toString() {
157            return "WriteChecker";
158        }
159    };
160
161    public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) {
162        super(next);
163        this.wireFormat = wireFormat;
164    }
165
166    @Override
167    public void start() throws Exception {
168        next.start();
169        startMonitorThreads();
170    }
171
172    @Override
173    public void stop() throws Exception {
174        stopMonitorThreads();
175        next.stop();
176    }
177
178    final void writeCheck() {
179        if (inSend.get()) {
180            LOG.trace("Send in progress. Skipping write check.");
181            return;
182        }
183
184        if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isShutdown()) {
185            LOG.trace("{} no message sent since last write check, sending a KeepAliveInfo", this);
186
187            try {
188                ASYNC_TASKS.execute(new Runnable() {
189                    @Override
190                    public void run() {
191                        LOG.debug("Running {}", this);
192                        if (monitorStarted.get()) {
193                            try {
194                                // If we can't get the lock it means another
195                                // write beat us into the
196                                // send and we don't need to heart beat now.
197                                if (sendLock.writeLock().tryLock()) {
198                                    KeepAliveInfo info = new KeepAliveInfo();
199                                    info.setResponseRequired(keepAliveResponseRequired);
200                                    doOnewaySend(info);
201                                }
202                            } catch (IOException e) {
203                                onException(e);
204                            } finally {
205                                if (sendLock.writeLock().isHeldByCurrentThread()) {
206                                    sendLock.writeLock().unlock();
207                                }
208                            }
209                        }
210                    }
211
212                    @Override
213                    public String toString() {
214                        return "WriteCheck[" + getRemoteAddress() + "]";
215                    };
216                });
217            } catch (RejectedExecutionException ex) {
218                if (!ASYNC_TASKS.isShutdown()) {
219                    LOG.error("Async write check was rejected from the executor: ", ex);
220                    throw ex;
221                }
222            }
223        } else {
224            LOG.trace("{} message sent since last write check, resetting flag.", this);
225        }
226
227        commandSent.set(false);
228    }
229
230    final void readCheck() {
231        int currentCounter = next.getReceiveCounter();
232        int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
233        if (inReceive.get() || currentCounter != previousCounter) {
234            LOG.trace("A receive is in progress, skipping read check.");
235            return;
236        }
237        if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isShutdown()) {
238            LOG.debug("No message received since last read check for {}. Throwing InactivityIOException.", this);
239
240            try {
241                ASYNC_TASKS.execute(new Runnable() {
242                    @Override
243                    public void run() {
244                        LOG.debug("Running {}", this);
245                        onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
246                    }
247
248                    @Override
249                    public String toString() {
250                        return "ReadCheck[" + getRemoteAddress() + "]";
251                    };
252                });
253            } catch (RejectedExecutionException ex) {
254                if (!ASYNC_TASKS.isShutdown()) {
255                    LOG.error("Async read check was rejected from the executor: ", ex);
256                    throw ex;
257                }
258            }
259        } else {
260            if (LOG.isTraceEnabled()) {
261                LOG.trace("Message received since last read check, resetting flag: ");
262            }
263        }
264        commandReceived.set(false);
265    }
266
267    protected abstract void processInboundWireFormatInfo(WireFormatInfo info) throws IOException;
268
269    protected abstract void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException;
270
271    @Override
272    public void onCommand(Object command) {
273        commandReceived.set(true);
274        inReceive.set(true);
275        try {
276            if (command.getClass() == KeepAliveInfo.class) {
277                KeepAliveInfo info = (KeepAliveInfo) command;
278                if (info.isResponseRequired()) {
279                    sendLock.readLock().lock();
280                    try {
281                        info.setResponseRequired(false);
282                        oneway(info);
283                    } catch (IOException e) {
284                        onException(e);
285                    } finally {
286                        sendLock.readLock().unlock();
287                    }
288                }
289            } else {
290                if (command.getClass() == WireFormatInfo.class) {
291                    synchronized (this) {
292                        try {
293                            processInboundWireFormatInfo((WireFormatInfo) command);
294                        } catch (IOException e) {
295                            onException(e);
296                        }
297                    }
298                }
299
300                transportListener.onCommand(command);
301            }
302        } finally {
303            inReceive.set(false);
304        }
305    }
306
307    @Override
308    public void oneway(Object o) throws IOException {
309        // To prevent the inactivity monitor from sending a message while we
310        // are performing a send we take a read lock. The inactivity monitor
311        // sends its Heart-beat commands under a write lock. This means that
312        // the MutexTransport is still responsible for synchronizing sends
313        sendLock.readLock().lock();
314        inSend.set(true);
315        try {
316            doOnewaySend(o);
317        } finally {
318            commandSent.set(true);
319            inSend.set(false);
320            sendLock.readLock().unlock();
321        }
322    }
323
324    // Must be called under lock, either read or write on sendLock.
325    private void doOnewaySend(Object command) throws IOException {
326        if (failed.get()) {
327            throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
328        }
329        if (command.getClass() == WireFormatInfo.class) {
330            synchronized (this) {
331                processOutboundWireFormatInfo((WireFormatInfo) command);
332            }
333        }
334        next.oneway(command);
335    }
336
337    @Override
338    public void onException(IOException error) {
339        if (failed.compareAndSet(false, true)) {
340            stopMonitorThreads();
341            if (sendLock.writeLock().isHeldByCurrentThread()) {
342                sendLock.writeLock().unlock();
343            }
344            transportListener.onException(error);
345        }
346    }
347
348    public void setUseKeepAlive(boolean val) {
349        useKeepAlive = val;
350    }
351
352    public long getConnectAttemptTimeout() {
353        return connectAttemptTimeout;
354    }
355
356    public void setConnectAttemptTimeout(long connectionTimeout) {
357        this.connectAttemptTimeout = connectionTimeout;
358    }
359
360    public long getReadCheckTime() {
361        return readCheckTime;
362    }
363
364    public void setReadCheckTime(long readCheckTime) {
365        this.readCheckTime = readCheckTime;
366    }
367
368    public long getWriteCheckTime() {
369        return writeCheckTime;
370    }
371
372    public void setWriteCheckTime(long writeCheckTime) {
373        this.writeCheckTime = writeCheckTime;
374    }
375
376    public long getInitialDelayTime() {
377        return initialDelayTime;
378    }
379
380    public void setInitialDelayTime(long initialDelayTime) {
381        this.initialDelayTime = initialDelayTime;
382    }
383
384    public boolean isKeepAliveResponseRequired() {
385        return this.keepAliveResponseRequired;
386    }
387
388    public void setKeepAliveResponseRequired(boolean value) {
389        this.keepAliveResponseRequired = value;
390    }
391
392    public boolean isMonitorStarted() {
393        return this.monitorStarted.get();
394    }
395
396    abstract protected boolean configuredOk() throws IOException;
397
398    public synchronized void startConnectCheckTask() {
399        startConnectCheckTask(getConnectAttemptTimeout());
400    }
401
402    public synchronized void startConnectCheckTask(long connectionTimeout) {
403        if (connectionTimeout <= 0) {
404            return;
405        }
406
407        LOG.trace("Starting connection check task for: {}", this);
408
409        this.connectAttemptTimeout = connectionTimeout;
410
411        if (connectCheckerTask == null) {
412            connectCheckerTask = new SchedulerTimerTask(connectChecker);
413
414            synchronized (AbstractInactivityMonitor.class) {
415                if (CHECKER_COUNTER == 0) {
416                    if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
417                        ASYNC_TASKS = createExecutor();
418                    }
419                    if (READ_CHECK_TIMER == null) {
420                        READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer", true);
421                    }
422                }
423                CHECKER_COUNTER++;
424                READ_CHECK_TIMER.schedule(connectCheckerTask, connectionTimeout);
425            }
426        }
427    }
428
429    public synchronized void stopConnectCheckTask() {
430        if (connectCheckerTask != null) {
431            LOG.trace("Stopping connection check task for: {}", this);
432            connectCheckerTask.cancel();
433            connectCheckerTask = null;
434
435            synchronized (AbstractInactivityMonitor.class) {
436                READ_CHECK_TIMER.purge();
437                CHECKER_COUNTER--;
438            }
439        }
440    }
441
442    protected synchronized void startMonitorThreads() throws IOException {
443        if (monitorStarted.get()) {
444            return;
445        }
446
447        if (!configuredOk()) {
448            return;
449        }
450
451        if (readCheckTime > 0) {
452            readCheckerTask = new SchedulerTimerTask(readChecker);
453        }
454
455        if (writeCheckTime > 0) {
456            writeCheckerTask = new SchedulerTimerTask(writeChecker);
457        }
458
459        if (writeCheckTime > 0 || readCheckTime > 0) {
460            monitorStarted.set(true);
461            synchronized (AbstractInactivityMonitor.class) {
462                if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
463                    ASYNC_TASKS = createExecutor();
464                }
465                if (READ_CHECK_TIMER == null) {
466                    READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer", true);
467                }
468                if (WRITE_CHECK_TIMER == null) {
469                    WRITE_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor WriteCheckTimer", true);
470                }
471
472                CHECKER_COUNTER++;
473                if (readCheckTime > 0) {
474                    READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
475                }
476                if (writeCheckTime > 0) {
477                    WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
478                }
479            }
480        }
481    }
482
483    protected synchronized void stopMonitorThreads() {
484        stopConnectCheckTask();
485        if (monitorStarted.compareAndSet(true, false)) {
486            if (readCheckerTask != null) {
487                readCheckerTask.cancel();
488            }
489            if (writeCheckerTask != null) {
490                writeCheckerTask.cancel();
491            }
492
493            synchronized (AbstractInactivityMonitor.class) {
494                WRITE_CHECK_TIMER.purge();
495                READ_CHECK_TIMER.purge();
496                CHECKER_COUNTER--;
497                if (CHECKER_COUNTER == 0) {
498                    WRITE_CHECK_TIMER.cancel();
499                    READ_CHECK_TIMER.cancel();
500                    WRITE_CHECK_TIMER = null;
501                    READ_CHECK_TIMER = null;
502                }
503            }
504        }
505    }
506
507    private final ThreadFactory factory = new ThreadFactory() {
508        @Override
509        public Thread newThread(Runnable runnable) {
510            Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker");
511            thread.setDaemon(true);
512            return thread;
513        }
514    };
515
516    private ThreadPoolExecutor createExecutor() {
517        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
518        exec.allowCoreThreadTimeOut(true);
519        return exec;
520    }
521
522    private static int getDefaultKeepAliveTime() {
523        return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", 30);
524    }
525}