001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport.mqtt;
019    
020    import java.io.IOException;
021    import java.util.Timer;
022    import java.util.concurrent.SynchronousQueue;
023    import java.util.concurrent.ThreadFactory;
024    import java.util.concurrent.ThreadPoolExecutor;
025    import java.util.concurrent.TimeUnit;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    import java.util.concurrent.atomic.AtomicInteger;
028    import java.util.concurrent.locks.ReentrantLock;
029    
030    import org.apache.activemq.command.KeepAliveInfo;
031    import org.apache.activemq.thread.SchedulerTimerTask;
032    import org.apache.activemq.transport.AbstractInactivityMonitor;
033    import org.apache.activemq.transport.InactivityIOException;
034    import org.apache.activemq.transport.Transport;
035    import org.apache.activemq.transport.TransportFilter;
036    import org.apache.activemq.util.ThreadPoolUtils;
037    import org.apache.activemq.wireformat.WireFormat;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    public class MQTTInactivityMonitor extends TransportFilter {
042    
043        private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class);
044    
045        private static final long DEFAULT_CHECK_TIME_MILLS = 30000;
046    
047        private static ThreadPoolExecutor ASYNC_TASKS;
048        private static int CHECKER_COUNTER;
049        private static Timer READ_CHECK_TIMER;
050    
051        private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
052        private final AtomicBoolean failed = new AtomicBoolean(false);
053        private final AtomicBoolean commandReceived = new AtomicBoolean(true);
054        private final AtomicBoolean inReceive = new AtomicBoolean(false);
055        private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
056    
057        private final ReentrantLock sendLock = new ReentrantLock();
058        private SchedulerTimerTask readCheckerTask;
059    
060        private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
061        private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
062        private boolean keepAliveResponseRequired;
063        private MQTTProtocolConverter protocolConverter;
064    
065        private final Runnable readChecker = new Runnable() {
066            long lastRunTime;
067    
068            public void run() {
069                long now = System.currentTimeMillis();
070                long elapsed = (now - lastRunTime);
071    
072                if (lastRunTime != 0 && LOG.isDebugEnabled()) {
073                    LOG.debug("" + elapsed + " ms elapsed since last read check.");
074                }
075    
076                // Perhaps the timer executed a read check late.. and then executes
077                // the next read check on time which causes the time elapsed between
078                // read checks to be small..
079    
080                // If less than 90% of the read check Time elapsed then abort this readcheck.
081                if (!allowReadCheck(elapsed)) { // FUNKY qdox bug does not allow me to inline this expression.
082                    LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
083                    return;
084                }
085    
086                lastRunTime = now;
087                readCheck();
088            }
089        };
090    
091        private boolean allowReadCheck(long elapsed) {
092            return elapsed > (readCheckTime * 9 / 10);
093        }
094    
095        public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
096            super(next);
097        }
098    
099        public void start() throws Exception {
100            next.start();
101            startMonitorThread();
102        }
103    
104        public void stop() throws Exception {
105            stopMonitorThread();
106            next.stop();
107        }
108    
109        final void readCheck() {
110            int currentCounter = next.getReceiveCounter();
111            int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
112    
113            // for the PINGREQ/RESP frames, the currentCounter will be different from previousCounter, and that
114            // should be sufficient to indicate the connection is still alive. If there were random data, or something
115            // outside the scope of the spec, the wire format unrmarshalling would fail, so we don't need to handle
116            // PINGREQ/RESP explicitly here
117            if (inReceive.get() || currentCounter != previousCounter) {
118                if (LOG.isTraceEnabled()) {
119                    LOG.trace("A receive is in progress");
120                }
121                return;
122            }
123            if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
124                if (LOG.isDebugEnabled()) {
125                    LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
126                }
127                ASYNC_TASKS.execute(new Runnable() {
128                    public void run() {
129                        if (protocolConverter != null) {
130                            protocolConverter.onTransportError();
131                        }
132                        onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
133                    }
134                });
135            } else {
136                if (LOG.isTraceEnabled()) {
137                    LOG.trace("Message received since last read check, resetting flag: ");
138                }
139            }
140            commandReceived.set(false);
141        }
142    
143        public void onCommand(Object command) {
144            commandReceived.set(true);
145            inReceive.set(true);
146            try {
147                transportListener.onCommand(command);
148            } finally {
149                inReceive.set(false);
150            }
151        }
152    
153        public void oneway(Object o) throws IOException {
154            // To prevent the inactivity monitor from sending a message while we
155            // are performing a send we take the lock.
156            this.sendLock.lock();
157            try {
158                doOnewaySend(o);
159            } finally {
160                this.sendLock.unlock();
161            }
162        }
163    
164        // Must be called under lock, either read or write on sendLock.
165        private void doOnewaySend(Object command) throws IOException {
166            if (failed.get()) {
167                throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
168            }
169            next.oneway(command);
170        }
171    
172        public void onException(IOException error) {
173            if (failed.compareAndSet(false, true)) {
174                stopMonitorThread();
175                transportListener.onException(error);
176            }
177        }
178    
179        public long getReadCheckTime() {
180            return readCheckTime;
181        }
182    
183        public void setReadCheckTime(long readCheckTime) {
184            this.readCheckTime = readCheckTime;
185        }
186    
187        public long getInitialDelayTime() {
188            return initialDelayTime;
189        }
190    
191        public void setInitialDelayTime(long initialDelayTime) {
192            this.initialDelayTime = initialDelayTime;
193        }
194    
195        public boolean isKeepAliveResponseRequired() {
196            return this.keepAliveResponseRequired;
197        }
198    
199        public void setKeepAliveResponseRequired(boolean value) {
200            this.keepAliveResponseRequired = value;
201        }
202    
203        public boolean isMonitorStarted() {
204            return this.monitorStarted.get();
205        }
206    
207        public void setProtocolConverter(MQTTProtocolConverter protocolConverter) {
208            this.protocolConverter = protocolConverter;
209        }
210    
211        public MQTTProtocolConverter getProtocolConverter() {
212            return protocolConverter;
213        }
214    
215        synchronized void startMonitorThread() {
216    
217            // Not yet configured if this isn't set yet.
218            if (protocolConverter == null) {
219                return;
220            }
221    
222            if (monitorStarted.get()) {
223                return;
224            }
225    
226            if (readCheckTime > 0) {
227                readCheckerTask = new SchedulerTimerTask(readChecker);
228            }
229    
230            if (readCheckTime > 0) {
231                monitorStarted.set(true);
232                synchronized (AbstractInactivityMonitor.class) {
233                    if (CHECKER_COUNTER == 0) {
234                        ASYNC_TASKS = createExecutor();
235                        READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
236                    }
237                    CHECKER_COUNTER++;
238                    if (readCheckTime > 0) {
239                        READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
240                    }
241                }
242            }
243        }
244    
245        synchronized void stopMonitorThread() {
246            if (monitorStarted.compareAndSet(true, false)) {
247                if (readCheckerTask != null) {
248                    readCheckerTask.cancel();
249                }
250    
251                synchronized (AbstractInactivityMonitor.class) {
252                    READ_CHECK_TIMER.purge();
253                    CHECKER_COUNTER--;
254                    if (CHECKER_COUNTER == 0) {
255                        READ_CHECK_TIMER.cancel();
256                        READ_CHECK_TIMER = null;
257                        ThreadPoolUtils.shutdown(ASYNC_TASKS);
258                        ASYNC_TASKS = null;
259                    }
260                }
261            }
262        }
263    
264        private ThreadFactory factory = new ThreadFactory() {
265            public Thread newThread(Runnable runnable) {
266                Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable);
267                thread.setDaemon(true);
268                return thread;
269            }
270        };
271    
272        private ThreadPoolExecutor createExecutor() {
273            ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
274            exec.allowCoreThreadTimeOut(true);
275            return exec;
276        }
277    }