001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.transport.mqtt;
019
020import java.io.IOException;
021import java.util.Timer;
022import java.util.concurrent.RejectedExecutionException;
023import java.util.concurrent.SynchronousQueue;
024import java.util.concurrent.ThreadFactory;
025import java.util.concurrent.ThreadPoolExecutor;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicBoolean;
028import java.util.concurrent.atomic.AtomicInteger;
029import java.util.concurrent.locks.ReentrantLock;
030
031import org.apache.activemq.thread.SchedulerTimerTask;
032import org.apache.activemq.transport.AbstractInactivityMonitor;
033import org.apache.activemq.transport.InactivityIOException;
034import org.apache.activemq.transport.Transport;
035import org.apache.activemq.transport.TransportFilter;
036import org.apache.activemq.wireformat.WireFormat;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040public class MQTTInactivityMonitor extends TransportFilter {
041
042    private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class);
043
044    private static final long DEFAULT_CHECK_TIME_MILLS = 30000;
045
046    private static ThreadPoolExecutor ASYNC_TASKS;
047    private static int CHECKER_COUNTER;
048    private static Timer READ_CHECK_TIMER;
049
050    private final AtomicBoolean failed = new AtomicBoolean(false);
051    private final AtomicBoolean inReceive = new AtomicBoolean(false);
052    private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
053
054    private final ReentrantLock sendLock = new ReentrantLock();
055    private SchedulerTimerTask readCheckerTask;
056
057    private long readGraceTime = DEFAULT_CHECK_TIME_MILLS;
058    private long readKeepAliveTime = DEFAULT_CHECK_TIME_MILLS;
059    private MQTTProtocolConverter protocolConverter;
060
061    private long connectionTimeout = MQTTWireFormat.DEFAULT_CONNECTION_TIMEOUT;
062    private SchedulerTimerTask connectCheckerTask;
063    private final Runnable connectChecker = new Runnable() {
064
065        private final long startTime = System.currentTimeMillis();
066
067        @Override
068        public void run() {
069
070            long now = System.currentTimeMillis();
071
072            if ((now - startTime) >= connectionTimeout && connectCheckerTask != null && !ASYNC_TASKS.isShutdown()) {
073                if (LOG.isDebugEnabled()) {
074                    LOG.debug("No CONNECT frame received in time for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException.");
075                }
076
077                try {
078                    ASYNC_TASKS.execute(new Runnable() {
079                        @Override
080                        public void run() {
081                            onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime + readGraceTime) + ") long: "
082                                + next.getRemoteAddress()));
083                        }
084                    });
085                } catch (RejectedExecutionException ex) {
086                    if (!ASYNC_TASKS.isShutdown()) {
087                        LOG.error("Async connection timeout task was rejected from the executor: ", ex);
088                        throw ex;
089                    }
090                }
091            }
092        }
093    };
094
095    private final Runnable readChecker = new Runnable() {
096        long lastReceiveTime = System.currentTimeMillis();
097
098        @Override
099        public void run() {
100
101            long now = System.currentTimeMillis();
102            int currentCounter = next.getReceiveCounter();
103            int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
104
105            // for the PINGREQ/RESP frames, the currentCounter will be different
106            // from previousCounter, and that
107            // should be sufficient to indicate the connection is still alive.
108            // If there were random data, or something
109            // outside the scope of the spec, the wire format unrmarshalling
110            // would fail, so we don't need to handle
111            // PINGREQ/RESP explicitly here
112            if (inReceive.get() || currentCounter != previousCounter) {
113                if (LOG.isTraceEnabled()) {
114                    LOG.trace("Command received since last read check.");
115                }
116                lastReceiveTime = now;
117                return;
118            }
119
120            if ((now - lastReceiveTime) >= readKeepAliveTime + readGraceTime && readCheckerTask != null && !ASYNC_TASKS.isShutdown()) {
121                if (LOG.isDebugEnabled()) {
122                    LOG.debug("No message received since last read check for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException.");
123                }
124                try {
125                    ASYNC_TASKS.execute(new Runnable() {
126                        @Override
127                        public void run() {
128                            onException(new InactivityIOException("Channel was inactive for too (>" +
129                                        (connectionTimeout) + ") long: " + next.getRemoteAddress()));
130                        }
131                    });
132                } catch (RejectedExecutionException ex) {
133                    if (!ASYNC_TASKS.isShutdown()) {
134                        LOG.error("Async connection timeout task was rejected from the executor: ", ex);
135                        throw ex;
136                    }
137                }
138            }
139        }
140    };
141
142    public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
143        super(next);
144    }
145
146    @Override
147    public void start() throws Exception {
148        next.start();
149    }
150
151    @Override
152    public void stop() throws Exception {
153        stopReadChecker();
154        stopConnectChecker();
155        next.stop();
156    }
157
158    @Override
159    public void onCommand(Object command) {
160        inReceive.set(true);
161        try {
162            transportListener.onCommand(command);
163        } finally {
164            inReceive.set(false);
165        }
166    }
167
168    @Override
169    public void oneway(Object o) throws IOException {
170        // To prevent the inactivity monitor from sending a message while we
171        // are performing a send we take the lock.
172        this.sendLock.lock();
173        try {
174            doOnewaySend(o);
175        } finally {
176            this.sendLock.unlock();
177        }
178    }
179
180    // Must be called under lock, either read or write on sendLock.
181    private void doOnewaySend(Object command) throws IOException {
182        if (failed.get()) {
183            throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
184        }
185        next.oneway(command);
186    }
187
188    @Override
189    public void onException(IOException error) {
190        if (failed.compareAndSet(false, true)) {
191            stopConnectChecker();
192            stopReadChecker();
193            if (protocolConverter != null) {
194                protocolConverter.onTransportError();
195            }
196            transportListener.onException(error);
197        }
198    }
199
200    public long getReadGraceTime() {
201        return readGraceTime;
202    }
203
204    public void setReadGraceTime(long readGraceTime) {
205        this.readGraceTime = readGraceTime;
206    }
207
208    public long getReadKeepAliveTime() {
209        return readKeepAliveTime;
210    }
211
212    public void setReadKeepAliveTime(long readKeepAliveTime) {
213        this.readKeepAliveTime = readKeepAliveTime;
214    }
215
216    public void setProtocolConverter(MQTTProtocolConverter protocolConverter) {
217        this.protocolConverter = protocolConverter;
218    }
219
220    public MQTTProtocolConverter getProtocolConverter() {
221        return protocolConverter;
222    }
223
224    public synchronized void startConnectChecker(long connectionTimeout) {
225        this.connectionTimeout = connectionTimeout;
226        if (connectionTimeout > 0 && connectCheckerTask == null) {
227            connectCheckerTask = new SchedulerTimerTask(connectChecker);
228
229            long connectionCheckInterval = Math.min(connectionTimeout, 1000);
230
231            synchronized (AbstractInactivityMonitor.class) {
232                if (CHECKER_COUNTER == 0) {
233                    if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
234                        ASYNC_TASKS = createExecutor();
235                    }
236                    READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
237                }
238                CHECKER_COUNTER++;
239                READ_CHECK_TIMER.schedule(connectCheckerTask, connectionCheckInterval, connectionCheckInterval);
240            }
241        }
242    }
243
244    synchronized void startReadChecker() {
245        if (readKeepAliveTime > 0 && readCheckerTask == null) {
246            readCheckerTask = new SchedulerTimerTask(readChecker);
247
248            synchronized (AbstractInactivityMonitor.class) {
249                if (CHECKER_COUNTER == 0) {
250                    if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
251                        ASYNC_TASKS = createExecutor();
252                    }
253                    READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
254                }
255                CHECKER_COUNTER++;
256                READ_CHECK_TIMER.schedule(readCheckerTask, readKeepAliveTime, readGraceTime);
257            }
258        }
259    }
260
261    synchronized void stopConnectChecker() {
262        if (connectCheckerTask != null) {
263            connectCheckerTask.cancel();
264            connectCheckerTask = null;
265
266            synchronized (AbstractInactivityMonitor.class) {
267                READ_CHECK_TIMER.purge();
268                CHECKER_COUNTER--;
269                if (CHECKER_COUNTER == 0) {
270                    READ_CHECK_TIMER.cancel();
271                    READ_CHECK_TIMER = null;
272                }
273            }
274        }
275    }
276
277    synchronized void stopReadChecker() {
278        if (readCheckerTask != null) {
279            readCheckerTask.cancel();
280            readCheckerTask = null;
281
282            synchronized (AbstractInactivityMonitor.class) {
283                READ_CHECK_TIMER.purge();
284                CHECKER_COUNTER--;
285                if (CHECKER_COUNTER == 0) {
286                    READ_CHECK_TIMER.cancel();
287                    READ_CHECK_TIMER = null;
288                }
289            }
290        }
291    }
292
293    private final ThreadFactory factory = new ThreadFactory() {
294        @Override
295        public Thread newThread(Runnable runnable) {
296            Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable);
297            thread.setDaemon(true);
298            return thread;
299        }
300    };
301
302    private ThreadPoolExecutor createExecutor() {
303        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
304        exec.allowCoreThreadTimeOut(true);
305        return exec;
306    }
307}