001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport;
018
019import java.io.IOException;
020import java.util.Timer;
021import java.util.concurrent.RejectedExecutionException;
022import java.util.concurrent.SynchronousQueue;
023import java.util.concurrent.ThreadFactory;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicInteger;
028import java.util.concurrent.locks.ReentrantReadWriteLock;
029
030import org.apache.activemq.command.KeepAliveInfo;
031import org.apache.activemq.command.WireFormatInfo;
032import org.apache.activemq.thread.SchedulerTimerTask;
033import org.apache.activemq.util.ThreadPoolUtils;
034import org.apache.activemq.wireformat.WireFormat;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Used to make sure that commands are arriving periodically from the peer of
040 * the transport.
041 */
042public abstract class AbstractInactivityMonitor extends TransportFilter {
043
044    private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class);
045
046    private static final long DEFAULT_CHECK_TIME_MILLS = 30000;
047
048    private static ThreadPoolExecutor ASYNC_TASKS;
049    private static int CHECKER_COUNTER;
050    private static Timer READ_CHECK_TIMER;
051    private static Timer WRITE_CHECK_TIMER;
052
053    private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
054
055    private final AtomicBoolean commandSent = new AtomicBoolean(false);
056    private final AtomicBoolean inSend = new AtomicBoolean(false);
057    private final AtomicBoolean failed = new AtomicBoolean(false);
058
059    private final AtomicBoolean commandReceived = new AtomicBoolean(true);
060    private final AtomicBoolean inReceive = new AtomicBoolean(false);
061    private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
062
063    private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
064
065    private SchedulerTimerTask connectCheckerTask;
066    private SchedulerTimerTask writeCheckerTask;
067    private SchedulerTimerTask readCheckerTask;
068
069    private long connectAttemptTimeout = DEFAULT_CHECK_TIME_MILLS;
070    private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
071    private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
072    private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
073    private boolean useKeepAlive = true;
074    private boolean keepAliveResponseRequired;
075
076    protected WireFormat wireFormat;
077
078    private final Runnable connectChecker = new Runnable() {
079
080        private final long startTime = System.currentTimeMillis();
081
082        @Override
083        public void run() {
084            long now = System.currentTimeMillis();
085
086            if ((now - startTime) >= connectAttemptTimeout && connectCheckerTask != null && !ASYNC_TASKS.isShutdown()) {
087                LOG.debug("No connection attempt made in time for {}! Throwing InactivityIOException.", AbstractInactivityMonitor.this.toString());
088                try {
089                    ASYNC_TASKS.execute(new Runnable() {
090                        @Override
091                        public void run() {
092                            onException(new InactivityIOException(
093                                "Channel was inactive (no connection attempt made) for too (>" + (connectAttemptTimeout) + ") long: " + next.getRemoteAddress()));
094                        }
095                    });
096                } catch (RejectedExecutionException ex) {
097                    if (!ASYNC_TASKS.isShutdown()) {
098                        LOG.error("Async connection timeout task was rejected from the executor: ", ex);
099                        throw ex;
100                    }
101                }
102            }
103        }
104    };
105
106    private final Runnable readChecker = new Runnable() {
107        long lastRunTime;
108
109        @Override
110        public void run() {
111            long now = System.currentTimeMillis();
112            long elapsed = (now - lastRunTime);
113
114            if (lastRunTime != 0) {
115                LOG.debug("{}ms elapsed since last read check.", elapsed);
116            }
117
118            // Perhaps the timer executed a read check late.. and then executes
119            // the next read check on time which causes the time elapsed between
120            // read checks to be small..
121
122            // If less than 90% of the read check Time elapsed then abort this
123            // read check.
124            if (!allowReadCheck(elapsed)) {
125                LOG.debug("Aborting read check...Not enough time elapsed since last read check.");
126                return;
127            }
128
129            lastRunTime = now;
130            readCheck();
131        }
132
133        @Override
134        public String toString() {
135            return "ReadChecker";
136        }
137    };
138
139    private boolean allowReadCheck(long elapsed) {
140        return elapsed > (readCheckTime * 9 / 10);
141    }
142
143    private final Runnable writeChecker = new Runnable() {
144        long lastRunTime;
145
146        @Override
147        public void run() {
148            long now = System.currentTimeMillis();
149            if (lastRunTime != 0) {
150                LOG.debug("{}: {}ms elapsed since last write check.", this, (now - lastRunTime));
151            }
152            lastRunTime = now;
153            writeCheck();
154        }
155
156        @Override
157        public String toString() {
158            return "WriteChecker";
159        }
160    };
161
162    public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) {
163        super(next);
164        this.wireFormat = wireFormat;
165    }
166
167    @Override
168    public void start() throws Exception {
169        next.start();
170        startMonitorThreads();
171    }
172
173    @Override
174    public void stop() throws Exception {
175        stopMonitorThreads();
176        next.stop();
177    }
178
179    final void writeCheck() {
180        if (inSend.get()) {
181            LOG.trace("Send in progress. Skipping write check.");
182            return;
183        }
184
185        if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isShutdown()) {
186            LOG.trace("{} no message sent since last write check, sending a KeepAliveInfo", this);
187
188            try {
189                ASYNC_TASKS.execute(new Runnable() {
190                    @Override
191                    public void run() {
192                        LOG.debug("Running {}", this);
193                        if (monitorStarted.get()) {
194                            try {
195                                // If we can't get the lock it means another
196                                // write beat us into the
197                                // send and we don't need to heart beat now.
198                                if (sendLock.writeLock().tryLock()) {
199                                    KeepAliveInfo info = new KeepAliveInfo();
200                                    info.setResponseRequired(keepAliveResponseRequired);
201                                    doOnewaySend(info);
202                                }
203                            } catch (IOException e) {
204                                onException(e);
205                            } finally {
206                                if (sendLock.writeLock().isHeldByCurrentThread()) {
207                                    sendLock.writeLock().unlock();
208                                }
209                            }
210                        }
211                    }
212
213                    @Override
214                    public String toString() {
215                        return "WriteCheck[" + getRemoteAddress() + "]";
216                    };
217                });
218            } catch (RejectedExecutionException ex) {
219                if (!ASYNC_TASKS.isShutdown()) {
220                    LOG.error("Async write check was rejected from the executor: ", ex);
221                    throw ex;
222                }
223            }
224        } else {
225            LOG.trace("{} message sent since last write check, resetting flag.", this);
226        }
227
228        commandSent.set(false);
229    }
230
231    final void readCheck() {
232        int currentCounter = next.getReceiveCounter();
233        int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
234        if (inReceive.get() || currentCounter != previousCounter) {
235            LOG.trace("A receive is in progress, skipping read check.");
236            return;
237        }
238        if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isShutdown()) {
239            LOG.debug("No message received since last read check for {}. Throwing InactivityIOException.", this);
240
241            try {
242                ASYNC_TASKS.execute(new Runnable() {
243                    @Override
244                    public void run() {
245                        LOG.debug("Running {}", this);
246                        onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
247                    }
248
249                    @Override
250                    public String toString() {
251                        return "ReadCheck[" + getRemoteAddress() + "]";
252                    };
253                });
254            } catch (RejectedExecutionException ex) {
255                if (!ASYNC_TASKS.isShutdown()) {
256                    LOG.error("Async read check was rejected from the executor: ", ex);
257                    throw ex;
258                }
259            }
260        } else {
261            if (LOG.isTraceEnabled()) {
262                LOG.trace("Message received since last read check, resetting flag: ");
263            }
264        }
265        commandReceived.set(false);
266    }
267
268    protected abstract void processInboundWireFormatInfo(WireFormatInfo info) throws IOException;
269
270    protected abstract void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException;
271
272    @Override
273    public void onCommand(Object command) {
274        commandReceived.set(true);
275        inReceive.set(true);
276        try {
277            if (command.getClass() == KeepAliveInfo.class) {
278                KeepAliveInfo info = (KeepAliveInfo) command;
279                if (info.isResponseRequired()) {
280                    sendLock.readLock().lock();
281                    try {
282                        info.setResponseRequired(false);
283                        oneway(info);
284                    } catch (IOException e) {
285                        onException(e);
286                    } finally {
287                        sendLock.readLock().unlock();
288                    }
289                }
290            } else {
291                if (command.getClass() == WireFormatInfo.class) {
292                    synchronized (this) {
293                        try {
294                            processInboundWireFormatInfo((WireFormatInfo) command);
295                        } catch (IOException e) {
296                            onException(e);
297                        }
298                    }
299                }
300
301                transportListener.onCommand(command);
302            }
303        } finally {
304            inReceive.set(false);
305        }
306    }
307
308    @Override
309    public void oneway(Object o) throws IOException {
310        // To prevent the inactivity monitor from sending a message while we
311        // are performing a send we take a read lock. The inactivity monitor
312        // sends its Heart-beat commands under a write lock. This means that
313        // the MutexTransport is still responsible for synchronizing sends
314        sendLock.readLock().lock();
315        inSend.set(true);
316        try {
317            doOnewaySend(o);
318        } finally {
319            commandSent.set(true);
320            inSend.set(false);
321            sendLock.readLock().unlock();
322        }
323    }
324
325    // Must be called under lock, either read or write on sendLock.
326    private void doOnewaySend(Object command) throws IOException {
327        if (failed.get()) {
328            throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
329        }
330        if (command.getClass() == WireFormatInfo.class) {
331            synchronized (this) {
332                processOutboundWireFormatInfo((WireFormatInfo) command);
333            }
334        }
335        next.oneway(command);
336    }
337
338    @Override
339    public void onException(IOException error) {
340        if (failed.compareAndSet(false, true)) {
341            stopMonitorThreads();
342            if (sendLock.writeLock().isHeldByCurrentThread()) {
343                sendLock.writeLock().unlock();
344            }
345            transportListener.onException(error);
346        }
347    }
348
349    public void setUseKeepAlive(boolean val) {
350        useKeepAlive = val;
351    }
352
353    public long getConnectAttemptTimeout() {
354        return connectAttemptTimeout;
355    }
356
357    public void setConnectAttemptTimeout(long connectionTimeout) {
358        this.connectAttemptTimeout = connectionTimeout;
359    }
360
361    public long getReadCheckTime() {
362        return readCheckTime;
363    }
364
365    public void setReadCheckTime(long readCheckTime) {
366        this.readCheckTime = readCheckTime;
367    }
368
369    public long getWriteCheckTime() {
370        return writeCheckTime;
371    }
372
373    public void setWriteCheckTime(long writeCheckTime) {
374        this.writeCheckTime = writeCheckTime;
375    }
376
377    public long getInitialDelayTime() {
378        return initialDelayTime;
379    }
380
381    public void setInitialDelayTime(long initialDelayTime) {
382        this.initialDelayTime = initialDelayTime;
383    }
384
385    public boolean isKeepAliveResponseRequired() {
386        return this.keepAliveResponseRequired;
387    }
388
389    public void setKeepAliveResponseRequired(boolean value) {
390        this.keepAliveResponseRequired = value;
391    }
392
393    public boolean isMonitorStarted() {
394        return this.monitorStarted.get();
395    }
396
397    abstract protected boolean configuredOk() throws IOException;
398
399    public synchronized void startConnectCheckTask() {
400        startConnectCheckTask(getConnectAttemptTimeout());
401    }
402
403    public synchronized void startConnectCheckTask(long connectionTimeout) {
404        if (connectionTimeout <= 0) {
405            return;
406        }
407
408        LOG.trace("Starting connection check task for: {}", this);
409
410        this.connectAttemptTimeout = connectionTimeout;
411
412        if (connectCheckerTask == null) {
413            connectCheckerTask = new SchedulerTimerTask(connectChecker);
414
415            synchronized (AbstractInactivityMonitor.class) {
416                if (CHECKER_COUNTER == 0) {
417                    if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
418                        ASYNC_TASKS = createExecutor();
419                    }
420                    if (READ_CHECK_TIMER == null) {
421                        READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer", true);
422                    }
423                }
424                CHECKER_COUNTER++;
425                READ_CHECK_TIMER.schedule(connectCheckerTask, connectionTimeout);
426            }
427        }
428    }
429
430    public synchronized void stopConnectCheckTask() {
431        if (connectCheckerTask != null) {
432            LOG.trace("Stopping connection check task for: {}", this);
433            connectCheckerTask.cancel();
434            connectCheckerTask = null;
435
436            synchronized (AbstractInactivityMonitor.class) {
437                READ_CHECK_TIMER.purge();
438                CHECKER_COUNTER--;
439            }
440        }
441    }
442
443    protected synchronized void startMonitorThreads() throws IOException {
444        if (monitorStarted.get()) {
445            return;
446        }
447
448        if (!configuredOk()) {
449            return;
450        }
451
452        if (readCheckTime > 0) {
453            readCheckerTask = new SchedulerTimerTask(readChecker);
454        }
455
456        if (writeCheckTime > 0) {
457            writeCheckerTask = new SchedulerTimerTask(writeChecker);
458        }
459
460        if (writeCheckTime > 0 || readCheckTime > 0) {
461            monitorStarted.set(true);
462            synchronized (AbstractInactivityMonitor.class) {
463                if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
464                    ASYNC_TASKS = createExecutor();
465                }
466                if (READ_CHECK_TIMER == null) {
467                    READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer", true);
468                }
469                if (WRITE_CHECK_TIMER == null) {
470                    WRITE_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor WriteCheckTimer", true);
471                }
472
473                CHECKER_COUNTER++;
474                if (readCheckTime > 0) {
475                    READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
476                }
477                if (writeCheckTime > 0) {
478                    WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
479                }
480            }
481        }
482    }
483
484    protected synchronized void stopMonitorThreads() {
485        stopConnectCheckTask();
486        if (monitorStarted.compareAndSet(true, false)) {
487            if (readCheckerTask != null) {
488                readCheckerTask.cancel();
489            }
490            if (writeCheckerTask != null) {
491                writeCheckerTask.cancel();
492            }
493
494            synchronized (AbstractInactivityMonitor.class) {
495                WRITE_CHECK_TIMER.purge();
496                READ_CHECK_TIMER.purge();
497                CHECKER_COUNTER--;
498                if (CHECKER_COUNTER == 0) {
499                    WRITE_CHECK_TIMER.cancel();
500                    READ_CHECK_TIMER.cancel();
501                    WRITE_CHECK_TIMER = null;
502                    READ_CHECK_TIMER = null;
503                    try {
504                        ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, TimeUnit.SECONDS.toMillis(10));
505                    } finally {
506                        ASYNC_TASKS = null;
507                    }
508                }
509            }
510        }
511    }
512
513    private final ThreadFactory factory = new ThreadFactory() {
514        @Override
515        public Thread newThread(Runnable runnable) {
516            Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker");
517            thread.setDaemon(true);
518            return thread;
519        }
520    };
521
522    private ThreadPoolExecutor createExecutor() {
523        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
524        exec.allowCoreThreadTimeOut(true);
525        return exec;
526    }
527
528    private static int getDefaultKeepAliveTime() {
529        return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", 30);
530    }
531}