001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.transport.mqtt;
019
020 import java.io.IOException;
021 import java.util.Timer;
022 import java.util.concurrent.SynchronousQueue;
023 import java.util.concurrent.ThreadFactory;
024 import java.util.concurrent.ThreadPoolExecutor;
025 import java.util.concurrent.TimeUnit;
026 import java.util.concurrent.atomic.AtomicBoolean;
027 import java.util.concurrent.atomic.AtomicInteger;
028 import java.util.concurrent.locks.ReentrantReadWriteLock;
029
030 import org.apache.activemq.command.KeepAliveInfo;
031 import org.apache.activemq.thread.SchedulerTimerTask;
032 import org.apache.activemq.transport.AbstractInactivityMonitor;
033 import org.apache.activemq.transport.InactivityIOException;
034 import org.apache.activemq.transport.Transport;
035 import org.apache.activemq.transport.TransportFilter;
036 import org.apache.activemq.util.ThreadPoolUtils;
037 import org.apache.activemq.wireformat.WireFormat;
038 import org.slf4j.Logger;
039 import org.slf4j.LoggerFactory;
040
041 public class MQTTInactivityMonitor extends TransportFilter {
042
043 private static final Logger LOG = LoggerFactory.getLogger(MQTTInactivityMonitor.class);
044
045 private static ThreadPoolExecutor ASYNC_TASKS;
046 private static int CHECKER_COUNTER;
047 private static long DEFAULT_CHECK_TIME_MILLS = 30000;
048 private static Timer READ_CHECK_TIMER;
049
050 private final AtomicBoolean monitorStarted = new AtomicBoolean(false);
051
052 private final AtomicBoolean commandSent = new AtomicBoolean(false);
053 private final AtomicBoolean inSend = new AtomicBoolean(false);
054 private final AtomicBoolean failed = new AtomicBoolean(false);
055
056 private final AtomicBoolean commandReceived = new AtomicBoolean(true);
057 private final AtomicBoolean inReceive = new AtomicBoolean(false);
058 private final AtomicInteger lastReceiveCounter = new AtomicInteger(0);
059
060 private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
061 private SchedulerTimerTask readCheckerTask;
062
063 private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
064 private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
065 private boolean keepAliveResponseRequired;
066 private MQTTProtocolConverter protocolConverter;
067
068
069 private final Runnable readChecker = new Runnable() {
070 long lastRunTime;
071
072 public void run() {
073 long now = System.currentTimeMillis();
074 long elapsed = (now - lastRunTime);
075
076 if (lastRunTime != 0 && LOG.isDebugEnabled()) {
077 LOG.debug("" + elapsed + " ms elapsed since last read check.");
078 }
079
080 // Perhaps the timer executed a read check late.. and then executes
081 // the next read check on time which causes the time elapsed between
082 // read checks to be small..
083
084 // If less than 90% of the read check Time elapsed then abort this readcheck.
085 if (!allowReadCheck(elapsed)) { // FUNKY qdox bug does not allow me to inline this expression.
086 LOG.debug("Aborting read check.. Not enough time elapsed since last read check.");
087 return;
088 }
089
090 lastRunTime = now;
091 readCheck();
092 }
093 };
094
095 private boolean allowReadCheck(long elapsed) {
096 return elapsed > (readCheckTime * 9 / 10);
097 }
098
099
100 public MQTTInactivityMonitor(Transport next, WireFormat wireFormat) {
101 super(next);
102 }
103
104 public void start() throws Exception {
105 next.start();
106 startMonitorThread();
107 }
108
109 public void stop() throws Exception {
110 stopMonitorThread();
111 next.stop();
112 }
113
114
115 final void readCheck() {
116 int currentCounter = next.getReceiveCounter();
117 int previousCounter = lastReceiveCounter.getAndSet(currentCounter);
118 if (inReceive.get() || currentCounter != previousCounter) {
119 if (LOG.isTraceEnabled()) {
120 LOG.trace("A receive is in progress");
121 }
122 return;
123 }
124 if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating()) {
125 if (LOG.isDebugEnabled()) {
126 LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
127 }
128 ASYNC_TASKS.execute(new Runnable() {
129 public void run() {
130 if (protocolConverter != null) {
131 protocolConverter.onTransportError();
132 }
133 onException(new InactivityIOException("Channel was inactive for too (>" + readCheckTime + ") long: " + next.getRemoteAddress()));
134 }
135
136 ;
137 });
138 } else {
139 if (LOG.isTraceEnabled()) {
140 LOG.trace("Message received since last read check, resetting flag: ");
141 }
142 }
143 commandReceived.set(false);
144 }
145
146
147 public void onCommand(Object command) {
148 commandReceived.set(true);
149 inReceive.set(true);
150 try {
151 if (command.getClass() == KeepAliveInfo.class) {
152 KeepAliveInfo info = (KeepAliveInfo) command;
153 if (info.isResponseRequired()) {
154 sendLock.readLock().lock();
155 try {
156 info.setResponseRequired(false);
157 oneway(info);
158 } catch (IOException e) {
159 onException(e);
160 } finally {
161 sendLock.readLock().unlock();
162 }
163 }
164 } else {
165 transportListener.onCommand(command);
166 }
167 } finally {
168 inReceive.set(false);
169 }
170 }
171
172 public void oneway(Object o) throws IOException {
173 // To prevent the inactivity monitor from sending a message while we
174 // are performing a send we take a read lock. The inactivity monitor
175 // sends its Heart-beat commands under a write lock. This means that
176 // the MutexTransport is still responsible for synchronizing sends
177 this.sendLock.readLock().lock();
178 inSend.set(true);
179 try {
180 doOnewaySend(o);
181 } finally {
182 commandSent.set(true);
183 inSend.set(false);
184 this.sendLock.readLock().unlock();
185 }
186 }
187
188 // Must be called under lock, either read or write on sendLock.
189 private void doOnewaySend(Object command) throws IOException {
190 if (failed.get()) {
191 throw new InactivityIOException("Cannot send, channel has already failed: " + next.getRemoteAddress());
192 }
193 next.oneway(command);
194 }
195
196 public void onException(IOException error) {
197 if (failed.compareAndSet(false, true)) {
198 stopMonitorThread();
199 transportListener.onException(error);
200 }
201 }
202
203
204 public long getReadCheckTime() {
205 return readCheckTime;
206 }
207
208 public void setReadCheckTime(long readCheckTime) {
209 this.readCheckTime = readCheckTime;
210 }
211
212
213 public long getInitialDelayTime() {
214 return initialDelayTime;
215 }
216
217 public void setInitialDelayTime(long initialDelayTime) {
218 this.initialDelayTime = initialDelayTime;
219 }
220
221 public boolean isKeepAliveResponseRequired() {
222 return this.keepAliveResponseRequired;
223 }
224
225 public void setKeepAliveResponseRequired(boolean value) {
226 this.keepAliveResponseRequired = value;
227 }
228
229 public boolean isMonitorStarted() {
230 return this.monitorStarted.get();
231 }
232
233 public void setProtocolConverter(MQTTProtocolConverter protocolConverter) {
234 this.protocolConverter = protocolConverter;
235 }
236
237 public MQTTProtocolConverter getProtocolConverter() {
238 return protocolConverter;
239 }
240
241 synchronized void startMonitorThread() {
242 if (monitorStarted.get()) {
243 return;
244 }
245
246
247 if (readCheckTime > 0) {
248 readCheckerTask = new SchedulerTimerTask(readChecker);
249 }
250
251
252 if (readCheckTime > 0) {
253 monitorStarted.set(true);
254 synchronized (AbstractInactivityMonitor.class) {
255 if (CHECKER_COUNTER == 0) {
256 ASYNC_TASKS = createExecutor();
257 READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
258 }
259 CHECKER_COUNTER++;
260 if (readCheckTime > 0) {
261 READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
262 }
263 }
264 }
265 }
266
267
268 synchronized void stopMonitorThread() {
269 if (monitorStarted.compareAndSet(true, false)) {
270 if (readCheckerTask != null) {
271 readCheckerTask.cancel();
272 }
273
274 synchronized (AbstractInactivityMonitor.class) {
275 READ_CHECK_TIMER.purge();
276 CHECKER_COUNTER--;
277 if (CHECKER_COUNTER == 0) {
278 READ_CHECK_TIMER.cancel();
279 READ_CHECK_TIMER = null;
280 ThreadPoolUtils.shutdown(ASYNC_TASKS);
281 ASYNC_TASKS = null;
282 }
283 }
284 }
285 }
286
287 private ThreadFactory factory = new ThreadFactory() {
288 public Thread newThread(Runnable runnable) {
289 Thread thread = new Thread(runnable, "MQTTInactivityMonitor Async Task: " + runnable);
290 thread.setDaemon(true);
291 return thread;
292 }
293 };
294
295 private ThreadPoolExecutor createExecutor() {
296 ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
297 exec.allowCoreThreadTimeOut(true);
298 return exec;
299 }
300 }
301