001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport;
018    
019    import java.io.IOException;
020    import java.util.Timer;
021    import java.util.concurrent.RejectedExecutionException;
022    import java.util.concurrent.SynchronousQueue;
023    import java.util.concurrent.ThreadFactory;
024    import java.util.concurrent.ThreadPoolExecutor;
025    import java.util.concurrent.TimeUnit;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    import java.util.concurrent.atomic.AtomicInteger;
028    import java.util.concurrent.locks.ReentrantReadWriteLock;
029    
030    import org.apache.activemq.command.KeepAliveInfo;
031    import org.apache.activemq.command.WireFormatInfo;
032    import org.apache.activemq.thread.SchedulerTimerTask;
033    import org.apache.activemq.util.ThreadPoolUtils;
034    import org.apache.activemq.wireformat.WireFormat;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * Used to make sure that commands are arriving periodically from the peer of
040     * the transport.
041     */
042    public abstract class AbstractInactivityMonitor extends TransportFilter {
043    
044        private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class);
045    
046        private static ThreadPoolExecutor ASYNC_TASKS;
047        private static int CHECKER_COUNTER;
048        private static long DEFAULT_CHECK_TIME_MILLS = 30000;
049        private static Timer READ_CHECK_TIMER;
050        private static Timer WRITE_CHECK_TIMER;
051    
052        private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
053    
054        private final AtomicBoolean commandSent = new AtomicBoolean(false);
055        private final AtomicBoolean inSend = new AtomicBoolean(false);
056        private final AtomicBoolean failed = new AtomicBoolean(false);
057    
058        private final AtomicBoolean commandReceived = new AtomicBoolean(true);
059        private final AtomicBoolean inReceive = new AtomicBoolean(false);
060        private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
061    
062        private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
063    
064        private SchedulerTimerTask writeCheckerTask;
065        private SchedulerTimerTask readCheckerTask;
066    
067        private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
068        private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
069        private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
070        private boolean useKeepAlive = true;
071        private boolean keepAliveResponseRequired;
072    
073        protected WireFormat wireFormat;
074    
075        private final Runnable readChecker = new Runnable() {
076            long lastRunTime;
077    
078            @Override
079            public void run() {
080                long now = System.currentTimeMillis();
081                long elapsed = (now - lastRunTime);
082    
083                if (lastRunTime != 0) {
084                    LOG.debug("{}ms elapsed since last read check.", elapsed);
085                }
086    
087                // Perhaps the timer executed a read check late.. and then executes
088                // the next read check on time which causes the time elapsed between
089                // read checks to be small..
090    
091                // If less than 90% of the read check Time elapsed then abort this
092                // read check.
093                if (!allowReadCheck(elapsed)) {
094                    LOG.debug("Aborting read check...Not enough time elapsed since last read check.");
095                    return;
096                }
097    
098                lastRunTime = now;
099                readCheck();
100            }
101    
102            @Override
103            public String toString() {
104                return "ReadChecker";
105            }
106        };
107    
108        private boolean allowReadCheck(long elapsed) {
109            return elapsed > (readCheckTime * 9 / 10);
110        }
111    
112        private final Runnable writeChecker = new Runnable() {
113            long lastRunTime;
114    
115            @Override
116            public void run() {
117                long now = System.currentTimeMillis();
118                if (lastRunTime != 0) {
119                    LOG.debug("{}: {}ms elapsed since last write check.", this, (now - lastRunTime));
120                }
121                lastRunTime = now;
122                writeCheck();
123            }
124    
125            @Override
126            public String toString() {
127                return "WriteChecker";
128            }
129        };
130    
131        public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) {
132            super(next);
133            this.wireFormat = wireFormat;
134        }
135    
136        @Override
137        public void start() throws Exception {
138            next.start();
139            startMonitorThreads();
140        }
141    
142        @Override
143        public void stop() throws Exception {
144            stopMonitorThreads();
145            next.stop();
146        }
147    
148        final void writeCheck() {
149            if (inSend.get()) {
150                LOG.trace("Send in progress. Skipping write check.");
151                return;
152            }
153    
154            if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
155                LOG.trace("{} no message sent since last write check, sending a KeepAliveInfo", this);
156    
157                try {
158                    ASYNC_TASKS.execute(new Runnable() {
159                        @Override
160                        public void run() {
161                            LOG.debug("Running {}", this);
162                            if (monitorStarted.get()) {
163                                try {
164                                    // If we can't get the lock it means another
165                                    // write beat us into the
166                                    // send and we don't need to heart beat now.
167                                    if (sendLock.writeLock().tryLock()) {
168                                        KeepAliveInfo info = new KeepAliveInfo();
169                                        info.setResponseRequired(keepAliveResponseRequired);
170                                        doOnewaySend(info);
171                                    }
172                                } catch (IOException e) {
173                                    onException(e);
174                                } finally {
175                                    if (sendLock.writeLock().isHeldByCurrentThread()) {
176                                        sendLock.writeLock().unlock();
177                                    }
178                                }
179                            }
180                        }
181    
182                        @Override
183                        public String toString() {
184                            return "WriteCheck[" + getRemoteAddress() + "]";
185                        };
186                    });
187                } catch (RejectedExecutionException ex) {
188                    if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
189                        LOG.error("Async write check was rejected from the executor: ", ex);
190                        throw ex;
191                    }
192                }
193            } else {
194                LOG.trace("{} message sent since last write check, resetting flag.", this);
195            }
196    
197            commandSent.set(false);
198        }
199    
200        final void readCheck() {
201            int currentCounter = next.getReceiveCounter();
202            int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
203            if (inReceive.get() || currentCounter != previousCounter) {
204                LOG.trace("A receive is in progress, skipping read check.");
205                return;
206            }
207            if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
208                LOG.debug("No message received since last read check for {}. Throwing InactivityIOException.", this);
209    
210                try {
211                    ASYNC_TASKS.execute(new Runnable() {
212                        @Override
213                        public void run() {
214                            LOG.debug("Running {}", this);
215                            onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
216                        }
217    
218                        @Override
219                        public String toString() {
220                            return "ReadCheck[" + getRemoteAddress() + "]";
221                        };
222                    });
223                } catch (RejectedExecutionException ex) {
224                    if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
225                        LOG.error("Async read check was rejected from the executor: ", ex);
226                        throw ex;
227                    }
228                }
229            } else {
230                if (LOG.isTraceEnabled()) {
231                    LOG.trace("Message received since last read check, resetting flag: ");
232                }
233            }
234            commandReceived.set(false);
235        }
236    
237        protected abstract void processInboundWireFormatInfo(WireFormatInfo info) throws IOException;
238    
239        protected abstract void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException;
240    
241        @Override
242        public void onCommand(Object command) {
243            commandReceived.set(true);
244            inReceive.set(true);
245            try {
246                if (command.getClass() == KeepAliveInfo.class) {
247                    KeepAliveInfo info = (KeepAliveInfo) command;
248                    if (info.isResponseRequired()) {
249                        sendLock.readLock().lock();
250                        try {
251                            info.setResponseRequired(false);
252                            oneway(info);
253                        } catch (IOException e) {
254                            onException(e);
255                        } finally {
256                            sendLock.readLock().unlock();
257                        }
258                    }
259                } else {
260                    if (command.getClass() == WireFormatInfo.class) {
261                        synchronized (this) {
262                            try {
263                                processInboundWireFormatInfo((WireFormatInfo) command);
264                            } catch (IOException e) {
265                                onException(e);
266                            }
267                        }
268                    }
269    
270                    transportListener.onCommand(command);
271                }
272            } finally {
273                inReceive.set(false);
274            }
275        }
276    
277        @Override
278        public void oneway(Object o) throws IOException {
279            // To prevent the inactivity monitor from sending a message while we
280            // are performing a send we take a read lock. The inactivity monitor
281            // sends its Heart-beat commands under a write lock. This means that
282            // the MutexTransport is still responsible for synchronizing sends
283            this.sendLock.readLock().lock();
284            inSend.set(true);
285            try {
286                doOnewaySend(o);
287            } finally {
288                commandSent.set(true);
289                inSend.set(false);
290                this.sendLock.readLock().unlock();
291            }
292        }
293    
294        // Must be called under lock, either read or write on sendLock.
295        private void doOnewaySend(Object command) throws IOException {
296            if (failed.get()) {
297                throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
298            }
299            if (command.getClass() == WireFormatInfo.class) {
300                synchronized (this) {
301                    processOutboundWireFormatInfo((WireFormatInfo) command);
302                }
303            }
304            next.oneway(command);
305        }
306    
307        @Override
308        public void onException(IOException error) {
309            if (failed.compareAndSet(false, true)) {
310                stopMonitorThreads();
311                if (sendLock.writeLock().isHeldByCurrentThread()) {
312                    sendLock.writeLock().unlock();
313                }
314                transportListener.onException(error);
315            }
316        }
317    
318        public void setUseKeepAlive(boolean val) {
319            useKeepAlive = val;
320        }
321    
322        public long getReadCheckTime() {
323            return readCheckTime;
324        }
325    
326        public void setReadCheckTime(long readCheckTime) {
327            this.readCheckTime = readCheckTime;
328        }
329    
330        public long getWriteCheckTime() {
331            return writeCheckTime;
332        }
333    
334        public void setWriteCheckTime(long writeCheckTime) {
335            this.writeCheckTime = writeCheckTime;
336        }
337    
338        public long getInitialDelayTime() {
339            return initialDelayTime;
340        }
341    
342        public void setInitialDelayTime(long initialDelayTime) {
343            this.initialDelayTime = initialDelayTime;
344        }
345    
346        public boolean isKeepAliveResponseRequired() {
347            return this.keepAliveResponseRequired;
348        }
349    
350        public void setKeepAliveResponseRequired(boolean value) {
351            this.keepAliveResponseRequired = value;
352        }
353    
354        public boolean isMonitorStarted() {
355            return this.monitorStarted.get();
356        }
357    
358        protected synchronized void startMonitorThreads() throws IOException {
359            if (monitorStarted.get()) {
360                return;
361            }
362    
363            if (!configuredOk()) {
364                return;
365            }
366    
367            if (readCheckTime > 0) {
368                readCheckerTask = new SchedulerTimerTask(readChecker);
369            }
370    
371            if (writeCheckTime > 0) {
372                writeCheckerTask = new SchedulerTimerTask(writeChecker);
373            }
374    
375            if (writeCheckTime > 0 || readCheckTime > 0) {
376                monitorStarted.set(true);
377                synchronized (AbstractInactivityMonitor.class) {
378                    if (CHECKER_COUNTER == 0) {
379                        ASYNC_TASKS = createExecutor();
380                        READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer", true);
381                        WRITE_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor WriteCheckTimer", true);
382                    }
383                    CHECKER_COUNTER++;
384                    if (readCheckTime > 0) {
385                        READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
386                    }
387                    if (writeCheckTime > 0) {
388                        WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
389                    }
390                }
391            }
392        }
393    
394        abstract protected boolean configuredOk() throws IOException;
395    
396        protected synchronized void stopMonitorThreads() {
397            if (monitorStarted.compareAndSet(true, false)) {
398                if (readCheckerTask != null) {
399                    readCheckerTask.cancel();
400                }
401                if (writeCheckerTask != null) {
402                    writeCheckerTask.cancel();
403                }
404                synchronized (AbstractInactivityMonitor.class) {
405                    WRITE_CHECK_TIMER.purge();
406                    READ_CHECK_TIMER.purge();
407                    CHECKER_COUNTER--;
408                    if (CHECKER_COUNTER == 0) {
409                        WRITE_CHECK_TIMER.cancel();
410                        READ_CHECK_TIMER.cancel();
411                        WRITE_CHECK_TIMER = null;
412                        READ_CHECK_TIMER = null;
413                        ThreadPoolUtils.shutdown(ASYNC_TASKS);
414                    }
415                }
416            }
417        }
418    
419        private final ThreadFactory factory = new ThreadFactory() {
420            @Override
421            public Thread newThread(Runnable runnable) {
422                Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker");
423                thread.setDaemon(true);
424                return thread;
425            }
426        };
427    
428        private ThreadPoolExecutor createExecutor() {
429            ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
430            exec.allowCoreThreadTimeOut(true);
431            return exec;
432        }
433    
434        private static int getDefaultKeepAliveTime() {
435            return Integer.getInteger("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", 30);
436        }
437    }