001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.thread;
018
019import java.util.concurrent.Executor;
020import java.util.concurrent.ExecutorService;
021import java.util.concurrent.RejectedExecutionHandler;
022import java.util.concurrent.SynchronousQueue;
023import java.util.concurrent.ThreadFactory;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028import java.util.concurrent.atomic.AtomicReference;
029
030import org.apache.activemq.util.ThreadPoolUtils;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Manages the thread pool for long running tasks. Long running tasks are not
036 * always active but when they are active, they may need a few iterations of
037 * processing for them to become idle. The manager ensures that each task is
038 * processes but that no one task overtakes the system. This is somewhat like
039 * cooperative multitasking.
040 *
041 * @org.apache.xbean.XBean
042 */
043public class TaskRunnerFactory implements Executor {
044
045    private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerFactory.class);
046    private final AtomicReference<ExecutorService> executorRef = new AtomicReference<>();
047    private int maxIterationsPerRun;
048    private String name;
049    private int priority;
050    private boolean daemon;
051    private final AtomicLong id = new AtomicLong(0);
052    private boolean dedicatedTaskRunner;
053    private long shutdownAwaitTermination = 30000;
054    private final AtomicBoolean initDone = new AtomicBoolean(false);
055    private int maxThreadPoolSize = getDefaultMaximumPoolSize();
056    private RejectedExecutionHandler rejectedTaskHandler = null;
057    private ClassLoader threadClassLoader;
058
059    public TaskRunnerFactory() {
060        this("ActiveMQ Task");
061    }
062
063    public TaskRunnerFactory(String name) {
064        this(name, Thread.NORM_PRIORITY, true, 1000);
065    }
066
067    private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
068        this(name, priority, daemon, maxIterationsPerRun, false);
069    }
070
071    public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) {
072        this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, getDefaultMaximumPoolSize());
073    }
074
075    public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) {
076        this.name = name;
077        this.priority = priority;
078        this.daemon = daemon;
079        this.maxIterationsPerRun = maxIterationsPerRun;
080        this.dedicatedTaskRunner = dedicatedTaskRunner;
081        this.maxThreadPoolSize = maxThreadPoolSize;
082    }
083
084    public void init() {
085        if (!initDone.get()) {
086            // If your OS/JVM combination has a good thread model, you may want to
087            // avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead.
088            //AMQ-6602 - lock instead of using compareAndSet to prevent threads from seeing a null value
089            //for executorRef inside createTaskRunner() on contention and creating a DedicatedTaskRunner
090            synchronized(this) {
091                //need to recheck if initDone is true under the lock
092                if (!initDone.get()) {
093                    if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
094                        executorRef.set(null);
095                    } else {
096                        executorRef.compareAndSet(null, createDefaultExecutor());
097                    }
098                    LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executorRef.get());
099                    initDone.set(true);
100                }
101            }
102        }
103    }
104
105    /**
106     * Performs a shutdown only, by which the thread pool is shutdown by not graceful nor aggressively.
107     *
108     * @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService)
109     */
110    public void shutdown() {
111        ExecutorService executor = executorRef.get();
112        if (executor != null) {
113            ThreadPoolUtils.shutdown(executor);
114        }
115        clearExecutor();
116    }
117
118    /**
119     * Performs a shutdown now (aggressively) on the thread pool.
120     *
121     * @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService)
122     */
123    public void shutdownNow() {
124        ExecutorService executor = executorRef.get();
125        if (executor != null) {
126            ThreadPoolUtils.shutdownNow(executor);
127        }
128        clearExecutor();
129    }
130
131    /**
132     * Performs a graceful shutdown.
133     *
134     * @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService)
135     */
136    public void shutdownGraceful() {
137        ExecutorService executor = executorRef.get();
138        if (executor != null) {
139            ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination);
140        }
141        clearExecutor();
142    }
143
144    private void clearExecutor() {
145        //clear under a lock to prevent threads from seeing initDone == true
146        //but then getting null from executorRef
147        synchronized(this) {
148            executorRef.set(null);
149            initDone.set(false);
150        }
151    }
152
153    public TaskRunner createTaskRunner(Task task, String name) {
154        init();
155        ExecutorService executor = executorRef.get();
156        if (executor != null) {
157            return new PooledTaskRunner(executor, task, maxIterationsPerRun);
158        } else {
159            return new DedicatedTaskRunner(task, name, priority, daemon);
160        }
161    }
162
163    @Override
164    public void execute(Runnable runnable) {
165        execute(runnable, name);
166    }
167
168    public void execute(Runnable runnable, String name) {
169        init();
170        LOG.trace("Execute[{}] runnable: {}", name, runnable);
171        ExecutorService executor = executorRef.get();
172        if (executor != null) {
173            executor.execute(runnable);
174        } else {
175            doExecuteNewThread(runnable, name);
176        }
177    }
178
179    private void doExecuteNewThread(Runnable runnable, String name) {
180        String threadName = name + "-" + id.incrementAndGet();
181        Thread thread = new Thread(runnable, threadName);
182        thread.setDaemon(daemon);
183
184        LOG.trace("Created and running thread[{}]: {}", threadName, thread);
185        thread.start();
186    }
187
188    protected ExecutorService createDefaultExecutor() {
189        ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getMaxThreadPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
190            @Override
191            public Thread newThread(Runnable runnable) {
192                String threadName = name + "-" + id.incrementAndGet();
193                Thread thread = new Thread(runnable, threadName);
194                thread.setDaemon(daemon);
195                thread.setPriority(priority);
196                if (threadClassLoader != null) {
197                    thread.setContextClassLoader(threadClassLoader);
198                }
199                thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
200                    @Override
201                    public void uncaughtException(final Thread t, final Throwable e) {
202                        LOG.error("Error in thread '{}'", t.getName(), e);
203                    }
204                });
205
206                LOG.trace("Created thread[{}]: {}", threadName, thread);
207                return thread;
208            }
209        });
210
211        if (rejectedTaskHandler != null) {
212            rc.setRejectedExecutionHandler(rejectedTaskHandler);
213        } else {
214            rc.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
215        }
216
217        return rc;
218    }
219
220    public ExecutorService getExecutor() {
221        return executorRef.get();
222    }
223
224    public void setExecutor(ExecutorService executor) {
225        this.executorRef.set(executor);
226    }
227
228    public int getMaxIterationsPerRun() {
229        return maxIterationsPerRun;
230    }
231
232    public void setMaxIterationsPerRun(int maxIterationsPerRun) {
233        this.maxIterationsPerRun = maxIterationsPerRun;
234    }
235
236    public String getName() {
237        return name;
238    }
239
240    public void setName(String name) {
241        this.name = name;
242    }
243
244    public int getPriority() {
245        return priority;
246    }
247
248    public void setPriority(int priority) {
249        this.priority = priority;
250    }
251
252    public boolean isDaemon() {
253        return daemon;
254    }
255
256    public void setDaemon(boolean daemon) {
257        this.daemon = daemon;
258    }
259
260    public boolean isDedicatedTaskRunner() {
261        return dedicatedTaskRunner;
262    }
263
264    public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
265        this.dedicatedTaskRunner = dedicatedTaskRunner;
266    }
267
268    public int getMaxThreadPoolSize() {
269        return maxThreadPoolSize;
270    }
271
272    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
273        this.maxThreadPoolSize = maxThreadPoolSize;
274    }
275
276    public void setThreadClassLoader(ClassLoader threadClassLoader) {
277        this.threadClassLoader = threadClassLoader;
278    }
279
280    public RejectedExecutionHandler getRejectedTaskHandler() {
281        return rejectedTaskHandler;
282    }
283
284    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
285        this.rejectedTaskHandler = rejectedTaskHandler;
286    }
287
288    public long getShutdownAwaitTermination() {
289        return shutdownAwaitTermination;
290    }
291
292    public void setShutdownAwaitTermination(long shutdownAwaitTermination) {
293        this.shutdownAwaitTermination = shutdownAwaitTermination;
294    }
295
296    private static int getDefaultCorePoolSize() {
297        return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.corePoolSize", 0);
298    }
299
300    private static int getDefaultMaximumPoolSize() {
301        return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.maximumPoolSize", Integer.MAX_VALUE);
302    }
303
304    private static int getDefaultKeepAliveTime() {
305        return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.keepAliveTime", 30);
306    }
307}