001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.transport.mqtt;
019    
020    import java.io.IOException;
021    import java.util.Timer;
022    import java.util.concurrent.SynchronousQueue;
023    import java.util.concurrent.ThreadFactory;
024    import java.util.concurrent.ThreadPoolExecutor;
025    import java.util.concurrent.TimeUnit;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    import java.util.concurrent.atomic.AtomicInteger;
028    import java.util.concurrent.locks.ReentrantReadWriteLock;
029    
030    import org.apache.activemq.command.KeepAliveInfo;
031    import org.apache.activemq.thread.SchedulerTimerTask;
032    import org.apache.activemq.transport.AbstractInactivityMonitor;
033    import org.apache.activemq.transport.InactivityIOException;
034    import org.apache.activemq.transport.Transport;
035    import org.apache.activemq.transport.TransportFilter;
036    import org.apache.activemq.util.ThreadPoolUtils;
037    import org.apache.activemq.wireformat.WireFormat;
038    import org.slf4j.Logger;
039    import org.slf4j.LoggerFactory;
040    
041    public class MQTTInactivityMonitor extends TransportFilter {
042    
043        private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class);
044    
045        private static ThreadPoolExecutor ASYNC_TASKS;
046        private static int CHECKER_COUNTER;
047        private static long DEFAULT_CHECK_TIME_MILLS = 30000;
048        private static Timer READ_CHECK_TIMER;
049    
050        private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
051    
052        private final AtomicBoolean commandSent = new AtomicBoolean(false);
053        private final AtomicBoolean inSend = new AtomicBoolean(false);
054        private final AtomicBoolean failed = new AtomicBoolean(false);
055    
056        private final AtomicBoolean commandReceived = new AtomicBoolean(true);
057        private final AtomicBoolean inReceive = new AtomicBoolean(false);
058        private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
059    
060        private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
061        private SchedulerTimerTask readCheckerTask;
062    
063        private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
064        private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
065        private boolean keepAliveResponseRequired;
066        private MQTTProtocolConverter protocolConverter;
067    
068    
069        private final Runnable readChecker = new Runnable() {
070            long lastRunTime;
071    
072            public void run() {
073                long now = System.currentTimeMillis();
074                long elapsed = (now - lastRunTime);
075    
076                if (lastRunTime != 0 && LOG.isDebugEnabled()) {
077                    LOG.debug("" + elapsed + " ms elapsed since last read check.");
078                }
079    
080                // Perhaps the timer executed a read check late.. and then executes
081                // the next read check on time which causes the time elapsed between
082                // read checks to be small..
083    
084                // If less than 90% of the read check Time elapsed then abort this readcheck.
085                if (!allowReadCheck(elapsed)) { // FUNKY qdox bug does not allow me to inline this expression.
086                    LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
087                    return;
088                }
089    
090                lastRunTime = now;
091                readCheck();
092            }
093        };
094    
095        private boolean allowReadCheck(long elapsed) {
096            return elapsed > (readCheckTime * 9 / 10);
097        }
098    
099    
100        public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
101            super(next);
102        }
103    
104        public void start() throws Exception {
105            next.start();
106            startMonitorThread();
107        }
108    
109        public void stop() throws Exception {
110            stopMonitorThread();
111            next.stop();
112        }
113    
114    
115        final void readCheck() {
116            int currentCounter = next.getReceiveCounter();
117            int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
118            if (inReceive.get() || currentCounter != previousCounter) {
119                if (LOG.isTraceEnabled()) {
120                    LOG.trace("A receive is in progress");
121                }
122                return;
123            }
124            if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
125                if (LOG.isDebugEnabled()) {
126                    LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
127                }
128                ASYNC_TASKS.execute(new Runnable() {
129                    public void run() {
130                        if (protocolConverter != null) {
131                            protocolConverter.onTransportError();
132                        }
133                        onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
134                    }
135    
136                    ;
137                });
138            } else {
139                if (LOG.isTraceEnabled()) {
140                    LOG.trace("Message received since last read check, resetting flag: ");
141                }
142            }
143            commandReceived.set(false);
144        }
145    
146    
147        public void onCommand(Object command) {
148            commandReceived.set(true);
149            inReceive.set(true);
150            try {
151                if (command.getClass() == KeepAliveInfo.class) {
152                    KeepAliveInfo info = (KeepAliveInfo) command;
153                    if (info.isResponseRequired()) {
154                        sendLock.readLock().lock();
155                        try {
156                            info.setResponseRequired(false);
157                            oneway(info);
158                        } catch (IOException e) {
159                            onException(e);
160                        } finally {
161                            sendLock.readLock().unlock();
162                        }
163                    }
164                } else {
165                    transportListener.onCommand(command);
166                }
167            } finally {
168                inReceive.set(false);
169            }
170        }
171    
172        public void oneway(Object o) throws IOException {
173            // To prevent the inactivity monitor from sending a message while we
174            // are performing a send we take a read lock.  The inactivity monitor
175            // sends its Heart-beat commands under a write lock.  This means that
176            // the MutexTransport is still responsible for synchronizing sends
177            this.sendLock.readLock().lock();
178            inSend.set(true);
179            try {
180                doOnewaySend(o);
181            } finally {
182                commandSent.set(true);
183                inSend.set(false);
184                this.sendLock.readLock().unlock();
185            }
186        }
187    
188        // Must be called under lock, either read or write on sendLock.
189        private void doOnewaySend(Object command) throws IOException {
190            if (failed.get()) {
191                throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
192            }
193            next.oneway(command);
194        }
195    
196        public void onException(IOException error) {
197            if (failed.compareAndSet(false, true)) {
198                stopMonitorThread();
199                transportListener.onException(error);
200            }
201        }
202    
203    
204        public long getReadCheckTime() {
205            return readCheckTime;
206        }
207    
208        public void setReadCheckTime(long readCheckTime) {
209            this.readCheckTime = readCheckTime;
210        }
211    
212    
213        public long getInitialDelayTime() {
214            return initialDelayTime;
215        }
216    
217        public void setInitialDelayTime(long initialDelayTime) {
218            this.initialDelayTime = initialDelayTime;
219        }
220    
221        public boolean isKeepAliveResponseRequired() {
222            return this.keepAliveResponseRequired;
223        }
224    
225        public void setKeepAliveResponseRequired(boolean value) {
226            this.keepAliveResponseRequired = value;
227        }
228    
229        public boolean isMonitorStarted() {
230            return this.monitorStarted.get();
231        }
232    
233        public void setProtocolConverter(MQTTProtocolConverter protocolConverter) {
234            this.protocolConverter = protocolConverter;
235        }
236    
237        public MQTTProtocolConverter getProtocolConverter() {
238            return protocolConverter;
239        }
240    
241        synchronized void startMonitorThread() {
242            if (monitorStarted.get()) {
243                return;
244            }
245    
246    
247            if (readCheckTime > 0) {
248                readCheckerTask = new SchedulerTimerTask(readChecker);
249            }
250    
251    
252            if (readCheckTime > 0) {
253                monitorStarted.set(true);
254                synchronized (AbstractInactivityMonitor.class) {
255                    if (CHECKER_COUNTER == 0) {
256                        ASYNC_TASKS = createExecutor();
257                        READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
258                    }
259                    CHECKER_COUNTER++;
260                    if (readCheckTime > 0) {
261                        READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
262                    }
263                }
264            }
265        }
266    
267    
268        synchronized void stopMonitorThread() {
269            if (monitorStarted.compareAndSet(true, false)) {
270                if (readCheckerTask != null) {
271                    readCheckerTask.cancel();
272                }
273    
274                synchronized (AbstractInactivityMonitor.class) {
275                    READ_CHECK_TIMER.purge();
276                    CHECKER_COUNTER--;
277                    if (CHECKER_COUNTER == 0) {
278                        READ_CHECK_TIMER.cancel();
279                        READ_CHECK_TIMER = null;
280                        ThreadPoolUtils.shutdown(ASYNC_TASKS);
281                        ASYNC_TASKS = null;
282                    }
283                }
284            }
285        }
286    
287        private ThreadFactory factory = new ThreadFactory() {
288            public Thread newThread(Runnable runnable) {
289                Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable);
290                thread.setDaemon(true);
291                return thread;
292            }
293        };
294    
295        private ThreadPoolExecutor createExecutor() {
296            ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
297            exec.allowCoreThreadTimeOut(true);
298            return exec;
299        }
300    }
301