001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transport;
018    
019    import java.io.IOException;
020    import java.util.Timer;
021    import java.util.concurrent.SynchronousQueue;
022    import java.util.concurrent.ThreadFactory;
023    import java.util.concurrent.ThreadPoolExecutor;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.atomic.AtomicBoolean;
026    import java.util.concurrent.atomic.AtomicInteger;
027    import java.util.concurrent.locks.ReentrantReadWriteLock;
028    
029    import org.apache.activemq.command.KeepAliveInfo;
030    import org.apache.activemq.command.WireFormatInfo;
031    import org.apache.activemq.thread.SchedulerTimerTask;
032    import org.apache.activemq.util.ThreadPoolUtils;
033    import org.apache.activemq.wireformat.WireFormat;
034    import org.slf4j.Logger;
035    import org.slf4j.LoggerFactory;
036    
037    /**
038     * Used to make sure that commands are arriving periodically from the peer of
039     * the transport.
040     */
041    public abstract class AbstractInactivityMonitor extends TransportFilter {
042    
043        private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class);
044    
045        private static ThreadPoolExecutor ASYNC_TASKS;
046        private static int CHECKER_COUNTER;
047        private static long DEFAULT_CHECK_TIME_MILLS = 30000;
048        private static Timer READ_CHECK_TIMER;
049        private static Timer WRITE_CHECK_TIMER;
050    
051        private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
052    
053        private final AtomicBoolean commandSent = new AtomicBoolean(false);
054        private final AtomicBoolean inSend = new AtomicBoolean(false);
055        private final AtomicBoolean failed = new AtomicBoolean(false);
056    
057        private final AtomicBoolean commandReceived = new AtomicBoolean(true);
058        private final AtomicBoolean inReceive = new AtomicBoolean(false);
059        private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
060    
061        private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
062    
063        private SchedulerTimerTask writeCheckerTask;
064        private SchedulerTimerTask readCheckerTask;
065    
066        private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
067        private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
068        private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
069        private boolean useKeepAlive = true;
070        private boolean keepAliveResponseRequired;
071    
072        protected WireFormat wireFormat;
073    
074        private final Runnable readChecker = new Runnable() {
075            long lastRunTime;
076            public void run() {
077                long now = System.currentTimeMillis();
078                long elapsed = (now-lastRunTime);
079    
080                if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
081                    LOG.debug(""+elapsed+" ms elapsed since last read check.");
082                }
083    
084                // Perhaps the timer executed a read check late.. and then executes
085                // the next read check on time which causes the time elapsed between
086                // read checks to be small..
087    
088                // If less than 90% of the read check Time elapsed then abort this readcheck.
089                if( !allowReadCheck(elapsed) ) { // FUNKY qdox bug does not allow me to inline this expression.
090                    LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
091                    return;
092                }
093    
094                lastRunTime = now;
095                readCheck();
096            }
097    
098            @Override
099            public String toString() {
100                return "ReadChecker";
101            }
102        };
103    
104        private boolean allowReadCheck(long elapsed) {
105            return elapsed > (readCheckTime * 9 / 10);
106        }
107    
108        private final Runnable writeChecker = new Runnable() {
109            long lastRunTime;
110            public void run() {
111                long now = System.currentTimeMillis();
112                if( lastRunTime != 0 && LOG.isDebugEnabled() ) {
113                    LOG.debug(this + " "+(now-lastRunTime)+" ms elapsed since last write check.");
114    
115                }
116                lastRunTime = now;
117                writeCheck();
118            }
119    
120            @Override
121            public String toString() {
122                return "WriteChecker";
123            }
124        };
125    
126        public AbstractInactivityMonitor(Transport next, WireFormat wireFormat) {
127            super(next);
128            this.wireFormat = wireFormat;
129        }
130    
131        public void start() throws Exception {
132            next.start();
133            startMonitorThreads();
134        }
135    
136        public void stop() throws Exception {
137            stopMonitorThreads();
138            next.stop();
139        }
140    
141        final void writeCheck() {
142            if (inSend.get()) {
143                if (LOG.isTraceEnabled()) {
144                    LOG.trace("A send is in progress");
145                }
146                return;
147            }
148    
149            if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
150                if (LOG.isTraceEnabled()) {
151                    LOG.trace(this + " no message sent since last write check, sending a KeepAliveInfo");
152                }
153                ASYNC_TASKS.execute(new Runnable() {
154                    public void run() {
155                        if (LOG.isDebugEnabled()) {
156                            LOG.debug("Running {}", this);
157                        }
158                        if (monitorStarted.get()) {
159                            try {
160                                // If we can't get the lock it means another write beat us into the
161                                // send and we don't need to heart beat now.
162                                if (sendLock.writeLock().tryLock()) {
163                                    KeepAliveInfo info = new KeepAliveInfo();
164                                    info.setResponseRequired(keepAliveResponseRequired);
165                                    doOnewaySend(info);
166                                }
167                            } catch (IOException e) {
168                                onException(e);
169                            } finally {
170                                 if (sendLock.writeLock().isHeldByCurrentThread()) {
171                                    sendLock.writeLock().unlock();
172                                 }
173                            }
174                        }
175                    }
176    
177                    @Override
178                    public String toString() {
179                        return "WriteCheck[" + getRemoteAddress() + "]";
180                    };
181                });
182            } else {
183                if (LOG.isTraceEnabled()) {
184                    LOG.trace(this + " message sent since last write check, resetting flag");
185                }
186            }
187    
188            commandSent.set(false);
189        }
190    
191        final void readCheck() {
192            int currentCounter = next.getReceiveCounter();
193            int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
194            if (inReceive.get() || currentCounter!=previousCounter ) {
195                if (LOG.isTraceEnabled()) {
196                    LOG.trace("A receive is in progress");
197                }
198                return;
199            }
200            if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
201                if (LOG.isDebugEnabled()) {
202                    LOG.debug("No message received since last read check for " + toString() + ". Throwing InactivityIOException.");
203                }
204                ASYNC_TASKS.execute(new Runnable() {
205                    public void run() {
206                        if (LOG.isDebugEnabled()) {
207                            LOG.debug("Running {}", this);
208                        }
209                        onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: "+next.getRemoteAddress()));
210                    }
211    
212                    @Override
213                    public String toString() {
214                        return "ReadCheck[" + getRemoteAddress() + "]";
215                    };
216                });
217            } else {
218                if (LOG.isTraceEnabled()) {
219                    LOG.trace("Message received since last read check, resetting flag: ");
220                }
221            }
222            commandReceived.set(false);
223        }
224    
225        protected abstract void processInboundWireFormatInfo(WireFormatInfo info) throws IOException;
226        protected abstract void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException;
227    
228        public void onCommand(Object command) {
229            commandReceived.set(true);
230            inReceive.set(true);
231            try {
232                if (command.getClass() == KeepAliveInfo.class) {
233                    KeepAliveInfo info = (KeepAliveInfo) command;
234                    if (info.isResponseRequired()) {
235                        sendLock.readLock().lock();
236                        try {
237                            info.setResponseRequired(false);
238                            oneway(info);
239                        } catch (IOException e) {
240                            onException(e);
241                        } finally {
242                            sendLock.readLock().unlock();
243                        }
244                    }
245                } else {
246                    if (command.getClass() == WireFormatInfo.class) {
247                        synchronized (this) {
248                            try {
249                                processInboundWireFormatInfo((WireFormatInfo) command);
250                            } catch (IOException e) {
251                                onException(e);
252                            }
253                        }
254                    }
255    
256                    transportListener.onCommand(command);
257                }
258            } finally {
259                inReceive.set(false);
260            }
261        }
262    
263        public void oneway(Object o) throws IOException {
264            // To prevent the inactivity monitor from sending a message while we
265            // are performing a send we take a read lock.  The inactivity monitor
266            // sends its Heart-beat commands under a write lock.  This means that
267            // the MutexTransport is still responsible for synchronizing sends
268            this.sendLock.readLock().lock();
269            inSend.set(true);
270            try {
271                doOnewaySend(o);
272            } finally {
273                commandSent.set(true);
274                inSend.set(false);
275                this.sendLock.readLock().unlock();
276            }
277        }
278    
279        // Must be called under lock, either read or write on sendLock.
280        private void doOnewaySend(Object command) throws IOException {
281            if( failed.get() ) {
282                throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
283            }
284            if (command.getClass() == WireFormatInfo.class) {
285                synchronized (this) {
286                    processOutboundWireFormatInfo((WireFormatInfo) command);
287                }
288            }
289            next.oneway(command);
290        }
291    
292        public void onException(IOException error) {
293            if (failed.compareAndSet(false, true)) {
294                stopMonitorThreads();
295                transportListener.onException(error);
296            }
297        }
298    
299        public void setUseKeepAlive(boolean val) {
300            useKeepAlive = val;
301        }
302    
303        public long getReadCheckTime() {
304            return readCheckTime;
305        }
306    
307        public void setReadCheckTime(long readCheckTime) {
308            this.readCheckTime = readCheckTime;
309        }
310    
311        public long getWriteCheckTime() {
312            return writeCheckTime;
313        }
314    
315        public void setWriteCheckTime(long writeCheckTime) {
316            this.writeCheckTime = writeCheckTime;
317        }
318    
319        public long getInitialDelayTime() {
320            return initialDelayTime;
321        }
322    
323        public void setInitialDelayTime(long initialDelayTime) {
324            this.initialDelayTime = initialDelayTime;
325        }
326    
327        public boolean isKeepAliveResponseRequired() {
328            return this.keepAliveResponseRequired;
329        }
330    
331        public void setKeepAliveResponseRequired(boolean value) {
332            this.keepAliveResponseRequired = value;
333        }
334    
335        public boolean isMonitorStarted() {
336            return this.monitorStarted.get();
337        }
338    
339        protected synchronized void startMonitorThreads() throws IOException {
340            if (monitorStarted.get()) {
341                return;
342            }
343    
344            if (!configuredOk()) {
345                return;
346            }
347    
348            if (readCheckTime > 0) {
349                readCheckerTask = new SchedulerTimerTask(readChecker);
350            }
351    
352            if (writeCheckTime > 0) {
353                writeCheckerTask = new SchedulerTimerTask(writeChecker);
354            }
355    
356            if (writeCheckTime > 0 || readCheckTime > 0) {
357                monitorStarted.set(true);
358                synchronized(AbstractInactivityMonitor.class) {
359                    if( CHECKER_COUNTER == 0 ) {
360                        ASYNC_TASKS = createExecutor();
361                        READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer",true);
362                        WRITE_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor WriteCheckTimer",true);
363                    }
364                    CHECKER_COUNTER++;
365                    if (readCheckTime > 0) {
366                        READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
367                    }
368                    if (writeCheckTime > 0) {
369                        WRITE_CHECK_TIMER.schedule(writeCheckerTask, initialDelayTime, writeCheckTime);
370                    }
371                }
372            }
373        }
374    
375        abstract protected boolean configuredOk() throws IOException;
376    
377        protected synchronized void stopMonitorThreads() {
378            if (monitorStarted.compareAndSet(true, false)) {
379                if (readCheckerTask != null) {
380                    readCheckerTask.cancel();
381                }
382                if (writeCheckerTask != null) {
383                    writeCheckerTask.cancel();
384                }
385                synchronized( AbstractInactivityMonitor.class ) {
386                    WRITE_CHECK_TIMER.purge();
387                    READ_CHECK_TIMER.purge();
388                    CHECKER_COUNTER--;
389                    if(CHECKER_COUNTER==0) {
390                      WRITE_CHECK_TIMER.cancel();
391                      READ_CHECK_TIMER.cancel();
392                        WRITE_CHECK_TIMER = null;
393                        READ_CHECK_TIMER = null;
394                        ThreadPoolUtils.shutdown(ASYNC_TASKS);
395                        ASYNC_TASKS = null;
396                    }
397                }
398            }
399        }
400    
401        private ThreadFactory factory = new ThreadFactory() {
402            public Thread newThread(Runnable runnable) {
403                Thread thread = new Thread(runnable, "ActiveMQ InactivityMonitor Worker");
404                thread.setDaemon(true);
405                return thread;
406            }
407        };
408    
409        private ThreadPoolExecutor createExecutor() {
410            // TODO: This value of 10 seconds seems to low, see discussion at
411            // http://activemq.2283324.n4.nabble.com/InactivityMonitor-Creating-too-frequent-threads-tp4656752.html;cid=1348142445209-351
412            ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
413            exec.allowCoreThreadTimeOut(true);
414            return exec;
415        }
416    }