001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport;
018    
019    import java.io.IOException;
020    import java.util.Timer;
021    import java.util.concurrent.RejectedExecutionException;
022    import java.util.concurrent.SynchronousQueue;
023    import java.util.concurrent.ThreadFactory;
024    import java.util.concurrent.ThreadPoolExecutor;
025    import java.util.concurrent.TimeUnit;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    import java.util.concurrent.atomic.AtomicInteger;
028    import java.util.concurrent.locks.ReentrantReadWriteLock;
029    
030    import org.apache.activemq.command.KeepAliveInfo;
031    import org.apache.activemq.command.WireFormatInfo;
032    import org.apache.activemq.thread.SchedulerTimerTask;
033    import org.apache.activemq.util.ThreadPoolUtils;
034    import org.apache.activemq.wireformat.WireFormat;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * Used to make sure that commands are arriving periodically from the peer of
040     * the transport.
041     */
042    public abstract class AbstractInactivityMonitor extends TransportFilter {
043    
044        private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class);
045    
046        private static ThreadPoolExecutor ASYNC_TASKS;
047        private static int CHECKER_COUNTER;
048        private static long DEFAULT_CHECK_TIME_MILLS = 30000;
049        private static Timer READ_CHECK_TIMER;
050        private static Timer WRITE_CHECK_TIMER;
051    
052        private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
053    
054        private final AtomicBoolean commandSent = new AtomicBoolean(false);
055        private final AtomicBoolean inSend = new AtomicBoolean(false);
056        private final AtomicBoolean failed = new AtomicBoolean(false);
057    
058        private final AtomicBoolean commandReceived = new AtomicBoolean(true);
059        private final AtomicBoolean inReceive = new AtomicBoolean(false);
060        private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
061    
062        private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
063    
064        private SchedulerTimerTask writeCheckerTask;
065        private SchedulerTimerTask readCheckerTask;
066    
067        private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
068        private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
069        private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
070        private boolean useKeepAlive = true;
071        private boolean keepAliveResponseRequired;
072    
073        protected WireFormat wireFormat;
074    
075        private final Runnable readChecker = new Runnable() {
076            long lastRunTime;
077    
078            @Override
079            public void run() {
080                long now = System.currentTimeMillis();
081                long elapsed = (now - lastRunTime);
082    
083                if (lastRunTime != 0 && LOG.isDebugEnabled()) {
084                    LOG.debug("" + elapsed + " ms elapsed since last read check.");
085                }
086    
087                // Perhaps the timer executed a read check late.. and then executes
088                // the next read check on time which causes the time elapsed between
089                // read checks to be small..
090    
091                // If less than 90% of the read check Time elapsed then abort this
092                // readcheck.
093                if (!allowReadCheck(elapsed)) { // FUNKY qdox bug does not allow me
094                                                // to inline this expression.
095                    LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
096                    return;
097                }
098    
099                lastRunTime = now;
100                readCheck();
101            }
102    
103            @Override
104            public String toString() {
105                return "ReadChecker";
106            }
107        };
108    
109        private boolean allowReadCheck(long elapsed) {
110            return elapsed > (readCheckTime * 9 / 10);
111        }
112    
113        private final Runnable writeChecker = new Runnable() {
114            long lastRunTime;
115    
116            @Override
117            public void run() {
118                long now = System.currentTimeMillis();
119                if (lastRunTime != 0 && LOG.isDebugEnabled()) {
120                    LOG.debug(this + " " + (now - lastRunTime) + " ms elapsed since last write check.");
121    
122                }
123                lastRunTime = now;
124                writeCheck();
125            }
126    
127            @Override
128            public String toString() {
129                return "WriteChecker";
130            }
131        };
132    
133        public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) {
134            super(next);
135            this.wireFormat = wireFormat;
136        }
137    
138        @Override
139        public void start() throws Exception {
140            next.start();
141            startMonitorThreads();
142        }
143    
144        @Override
145        public void stop() throws Exception {
146            stopMonitorThreads();
147            next.stop();
148        }
149    
150        final void writeCheck() {
151            if (inSend.get()) {
152                if (LOG.isTraceEnabled()) {
153                    LOG.trace("A send is in progress");
154                }
155                return;
156            }
157    
158            if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
159    
160                if (LOG.isTraceEnabled()) {
161                    LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
162                }
163    
164                try {
165                    ASYNC_TASKS.execute(new Runnable() {
166                        @Override
167                        public void run() {
168                            if (LOG.isDebugEnabled()) {
169                                LOG.debug("Running {}", this);
170                            }
171                            if (monitorStarted.get()) {
172                                try {
173                                    // If we can't get the lock it means another
174                                    // write beat us into the
175                                    // send and we don't need to heart beat now.
176                                    if (sendLock.writeLock().tryLock()) {
177                                        KeepAliveInfo info = new KeepAliveInfo();
178                                        info.setResponseRequired(keepAliveResponseRequired);
179                                        doOnewaySend(info);
180                                    }
181                                } catch (IOException e) {
182                                    onException(e);
183                                } finally {
184                                    if (sendLock.writeLock().isHeldByCurrentThread()) {
185                                        sendLock.writeLock().unlock();
186                                    }
187                                }
188                            }
189                        }
190    
191                        @Override
192                        public String toString() {
193                            return "WriteCheck[" + getRemoteAddress() + "]";
194                        };
195                    });
196                } catch (RejectedExecutionException ex) {
197                    if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
198                        LOG.error("Async write check was rejected from the executor: ", ex);
199                        throw ex;
200                    }
201                }
202            } else {
203                if (LOG.isTraceEnabled()) {
204                    LOG.trace(this + " message sent since last write check, resetting flag");
205                }
206            }
207    
208            commandSent.set(false);
209        }
210    
211        final void readCheck() {
212            int currentCounter = next.getReceiveCounter();
213            int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
214            if (inReceive.get() || currentCounter != previousCounter) {
215                if (LOG.isTraceEnabled()) {
216                    LOG.trace("A receive is in progress");
217                }
218                return;
219            }
220            if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
221    
222                if (LOG.isDebugEnabled()) {
223                    LOG.debug("No message received since last read check for " + toString() + ". Throwing InactivityIOException.");
224                }
225    
226                try {
227                    ASYNC_TASKS.execute(new Runnable() {
228                        @Override
229                        public void run() {
230                            if (LOG.isDebugEnabled()) {
231                                LOG.debug("Running {}", this);
232                            }
233                            onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
234                        }
235    
236                        @Override
237                        public String toString() {
238                            return "ReadCheck[" + getRemoteAddress() + "]";
239                        };
240                    });
241                } catch (RejectedExecutionException ex) {
242                    if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
243                        LOG.error("Async read check was rejected from the executor: ", ex);
244                        throw ex;
245                    }
246                }
247            } else {
248                if (LOG.isTraceEnabled()) {
249                    LOG.trace("Message received since last read check, resetting flag: ");
250                }
251            }
252            commandReceived.set(false);
253        }
254    
255        protected abstract void processInboundWireFormatInfo(WireFormatInfo info) throws IOException;
256    
257        protected abstract void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException;
258    
259        @Override
260        public void onCommand(Object command) {
261            commandReceived.set(true);
262            inReceive.set(true);
263            try {
264                if (command.getClass() == KeepAliveInfo.class) {
265                    KeepAliveInfo info = (KeepAliveInfo) command;
266                    if (info.isResponseRequired()) {
267                        sendLock.readLock().lock();
268                        try {
269                            info.setResponseRequired(false);
270                            oneway(info);
271                        } catch (IOException e) {
272                            onException(e);
273                        } finally {
274                            sendLock.readLock().unlock();
275                        }
276                    }
277                } else {
278                    if (command.getClass() == WireFormatInfo.class) {
279                        synchronized (this) {
280                            try {
281                                processInboundWireFormatInfo((WireFormatInfo) command);
282                            } catch (IOException e) {
283                                onException(e);
284                            }
285                        }
286                    }
287    
288                    transportListener.onCommand(command);
289                }
290            } finally {
291                inReceive.set(false);
292            }
293        }
294    
295        @Override
296        public void oneway(Object o) throws IOException {
297            // To prevent the inactivity monitor from sending a message while we
298            // are performing a send we take a read lock. The inactivity monitor
299            // sends its Heart-beat commands under a write lock. This means that
300            // the MutexTransport is still responsible for synchronizing sends
301            this.sendLock.readLock().lock();
302            inSend.set(true);
303            try {
304                doOnewaySend(o);
305            } finally {
306                commandSent.set(true);
307                inSend.set(false);
308                this.sendLock.readLock().unlock();
309            }
310        }
311    
312        // Must be called under lock, either read or write on sendLock.
313        private void doOnewaySend(Object command) throws IOException {
314            if (failed.get()) {
315                throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
316            }
317            if (command.getClass() == WireFormatInfo.class) {
318                synchronized (this) {
319                    processOutboundWireFormatInfo((WireFormatInfo) command);
320                }
321            }
322            next.oneway(command);
323        }
324    
325        @Override
326        public void onException(IOException error) {
327            if (failed.compareAndSet(false, true)) {
328                stopMonitorThreads();
329                if (sendLock.writeLock().isHeldByCurrentThread()) {
330                    sendLock.writeLock().unlock();
331                }
332                transportListener.onException(error);
333            }
334        }
335    
336        public void setUseKeepAlive(boolean val) {
337            useKeepAlive = val;
338        }
339    
340        public long getReadCheckTime() {
341            return readCheckTime;
342        }
343    
344        public void setReadCheckTime(long readCheckTime) {
345            this.readCheckTime = readCheckTime;
346        }
347    
348        public long getWriteCheckTime() {
349            return writeCheckTime;
350        }
351    
352        public void setWriteCheckTime(long writeCheckTime) {
353            this.writeCheckTime = writeCheckTime;
354        }
355    
356        public long getInitialDelayTime() {
357            return initialDelayTime;
358        }
359    
360        public void setInitialDelayTime(long initialDelayTime) {
361            this.initialDelayTime = initialDelayTime;
362        }
363    
364        public boolean isKeepAliveResponseRequired() {
365            return this.keepAliveResponseRequired;
366        }
367    
368        public void setKeepAliveResponseRequired(boolean value) {
369            this.keepAliveResponseRequired = value;
370        }
371    
372        public boolean isMonitorStarted() {
373            return this.monitorStarted.get();
374        }
375    
376        protected synchronized void startMonitorThreads() throws IOException {
377            if (monitorStarted.get()) {
378                return;
379            }
380    
381            if (!configuredOk()) {
382                return;
383            }
384    
385            if (readCheckTime > 0) {
386                readCheckerTask = new SchedulerTimerTask(readChecker);
387            }
388    
389            if (writeCheckTime > 0) {
390                writeCheckerTask = new SchedulerTimerTask(writeChecker);
391            }
392    
393            if (writeCheckTime > 0 || readCheckTime > 0) {
394                monitorStarted.set(true);
395                synchronized (AbstractInactivityMonitor.class) {
396                    if (CHECKER_COUNTER == 0) {
397                        ASYNC_TASKS = createExecutor();
398                        READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer", true);
399                        WRITE_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor WriteCheckTimer", true);
400                    }
401                    CHECKER_COUNTER++;
402                    if (readCheckTime > 0) {
403                        READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
404                    }
405                    if (writeCheckTime > 0) {
406                        WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
407                    }
408                }
409            }
410        }
411    
412        abstract protected boolean configuredOk() throws IOException;
413    
414        protected synchronized void stopMonitorThreads() {
415            if (monitorStarted.compareAndSet(true, false)) {
416                if (readCheckerTask != null) {
417                    readCheckerTask.cancel();
418                }
419                if (writeCheckerTask != null) {
420                    writeCheckerTask.cancel();
421                }
422                synchronized (AbstractInactivityMonitor.class) {
423                    WRITE_CHECK_TIMER.purge();
424                    READ_CHECK_TIMER.purge();
425                    CHECKER_COUNTER--;
426                    if (CHECKER_COUNTER == 0) {
427                        WRITE_CHECK_TIMER.cancel();
428                        READ_CHECK_TIMER.cancel();
429                        WRITE_CHECK_TIMER = null;
430                        READ_CHECK_TIMER = null;
431                        ThreadPoolUtils.shutdown(ASYNC_TASKS);
432                    }
433                }
434            }
435        }
436    
437        private final ThreadFactory factory = new ThreadFactory() {
438            @Override
439            public Thread newThread(Runnable runnable) {
440                Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker");
441                thread.setDaemon(true);
442                return thread;
443            }
444        };
445    
446        private ThreadPoolExecutor createExecutor() {
447            // TODO: This value of 10 seconds seems to low, see discussion at
448            // http://activemq.2283324.n4.nabble.com/InactivityMonitor-Creating-too-frequent-threads-tp4656752.html;cid=1348142445209-351
449            ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
450            exec.allowCoreThreadTimeOut(true);
451            return exec;
452        }
453    }