001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.thread;
018
019import java.util.concurrent.Executor;
020import java.util.concurrent.ExecutorService;
021import java.util.concurrent.RejectedExecutionHandler;
022import java.util.concurrent.SynchronousQueue;
023import java.util.concurrent.ThreadFactory;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicLong;
028
029import org.apache.activemq.util.ThreadPoolUtils;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Manages the thread pool for long running tasks. Long running tasks are not
035 * always active but when they are active, they may need a few iterations of
036 * processing for them to become idle. The manager ensures that each task is
037 * processes but that no one task overtakes the system. This is somewhat like
038 * cooperative multitasking.
039 *
040 * @org.apache.xbean.XBean
041 */
042public class TaskRunnerFactory implements Executor {
043
044    private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerFactory.class);
045    private ExecutorService executor;
046    private int maxIterationsPerRun;
047    private String name;
048    private int priority;
049    private boolean daemon;
050    private final AtomicLong id = new AtomicLong(0);
051    private boolean dedicatedTaskRunner;
052    private long shutdownAwaitTermination = 30000;
053    private final AtomicBoolean initDone = new AtomicBoolean(false);
054    private int maxThreadPoolSize = getDefaultMaximumPoolSize();
055    private RejectedExecutionHandler rejectedTaskHandler = null;
056    private ClassLoader threadClassLoader;
057
058    public TaskRunnerFactory() {
059        this("ActiveMQ Task");
060    }
061
062    public TaskRunnerFactory(String name) {
063        this(name, Thread.NORM_PRIORITY, true, 1000);
064    }
065
066    private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
067        this(name, priority, daemon, maxIterationsPerRun, false);
068    }
069
070    public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) {
071        this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, getDefaultMaximumPoolSize());
072    }
073
074    public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) {
075        this.name = name;
076        this.priority = priority;
077        this.daemon = daemon;
078        this.maxIterationsPerRun = maxIterationsPerRun;
079        this.dedicatedTaskRunner = dedicatedTaskRunner;
080        this.maxThreadPoolSize = maxThreadPoolSize;
081    }
082
083    public void init() {
084        if (initDone.compareAndSet(false, true)) {
085            // If your OS/JVM combination has a good thread model, you may want to
086            // avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead.
087            if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
088                executor = null;
089            } else if (executor == null) {
090                executor = createDefaultExecutor();
091            }
092            LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executor);
093        }
094    }
095
096    /**
097     * Performs a shutdown only, by which the thread pool is shutdown by not graceful nor aggressively.
098     *
099     * @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService)
100     */
101    public void shutdown() {
102        if (executor != null) {
103            ThreadPoolUtils.shutdown(executor);
104            executor = null;
105        }
106        initDone.set(false);
107    }
108
109    /**
110     * Performs a shutdown now (aggressively) on the thread pool.
111     *
112     * @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService)
113     */
114    public void shutdownNow() {
115        if (executor != null) {
116            ThreadPoolUtils.shutdownNow(executor);
117            executor = null;
118        }
119        initDone.set(false);
120    }
121
122    /**
123     * Performs a graceful shutdown.
124     *
125     * @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService)
126     */
127    public void shutdownGraceful() {
128        if (executor != null) {
129            ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination);
130            executor = null;
131        }
132        initDone.set(false);
133    }
134
135    public TaskRunner createTaskRunner(Task task, String name) {
136        init();
137        if (executor != null) {
138            return new PooledTaskRunner(executor, task, maxIterationsPerRun);
139        } else {
140            return new DedicatedTaskRunner(task, name, priority, daemon);
141        }
142    }
143
144    @Override
145    public void execute(Runnable runnable) {
146        execute(runnable, name);
147    }
148
149    public void execute(Runnable runnable, String name) {
150        init();
151        LOG.trace("Execute[{}] runnable: {}", name, runnable);
152        if (executor != null) {
153            executor.execute(runnable);
154        } else {
155            doExecuteNewThread(runnable, name);
156        }
157    }
158
159    private void doExecuteNewThread(Runnable runnable, String name) {
160        String threadName = name + "-" + id.incrementAndGet();
161        Thread thread = new Thread(runnable, threadName);
162        thread.setDaemon(daemon);
163
164        LOG.trace("Created and running thread[{}]: {}", threadName, thread);
165        thread.start();
166    }
167
168    protected ExecutorService createDefaultExecutor() {
169        ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getMaxThreadPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
170            @Override
171            public Thread newThread(Runnable runnable) {
172                String threadName = name + "-" + id.incrementAndGet();
173                Thread thread = new Thread(runnable, threadName);
174                thread.setDaemon(daemon);
175                thread.setPriority(priority);
176                if (threadClassLoader != null) {
177                    thread.setContextClassLoader(threadClassLoader);
178                }
179                thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
180                    @Override
181                    public void uncaughtException(final Thread t, final Throwable e) {
182                        LOG.error("Error in thread '{}'", t.getName(), e);
183                    }
184                });
185
186                LOG.trace("Created thread[{}]: {}", threadName, thread);
187                return thread;
188            }
189        });
190        
191        if (rejectedTaskHandler != null) {
192            rc.setRejectedExecutionHandler(rejectedTaskHandler);
193        }
194        
195        return rc;
196    }
197
198    public ExecutorService getExecutor() {
199        return executor;
200    }
201
202    public void setExecutor(ExecutorService executor) {
203        this.executor = executor;
204    }
205
206    public int getMaxIterationsPerRun() {
207        return maxIterationsPerRun;
208    }
209
210    public void setMaxIterationsPerRun(int maxIterationsPerRun) {
211        this.maxIterationsPerRun = maxIterationsPerRun;
212    }
213
214    public String getName() {
215        return name;
216    }
217
218    public void setName(String name) {
219        this.name = name;
220    }
221
222    public int getPriority() {
223        return priority;
224    }
225
226    public void setPriority(int priority) {
227        this.priority = priority;
228    }
229
230    public boolean isDaemon() {
231        return daemon;
232    }
233
234    public void setDaemon(boolean daemon) {
235        this.daemon = daemon;
236    }
237
238    public boolean isDedicatedTaskRunner() {
239        return dedicatedTaskRunner;
240    }
241
242    public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
243        this.dedicatedTaskRunner = dedicatedTaskRunner;
244    }
245
246    public int getMaxThreadPoolSize() {
247        return maxThreadPoolSize;
248    }
249
250    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
251        this.maxThreadPoolSize = maxThreadPoolSize;
252    }
253
254    public void setThreadClassLoader(ClassLoader threadClassLoader) {
255        this.threadClassLoader = threadClassLoader;
256    }
257
258    public RejectedExecutionHandler getRejectedTaskHandler() {
259        return rejectedTaskHandler;
260    }
261
262    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
263        this.rejectedTaskHandler = rejectedTaskHandler;
264    }
265
266    public long getShutdownAwaitTermination() {
267        return shutdownAwaitTermination;
268    }
269
270    public void setShutdownAwaitTermination(long shutdownAwaitTermination) {
271        this.shutdownAwaitTermination = shutdownAwaitTermination;
272    }
273
274    private static int getDefaultCorePoolSize() {
275        return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.corePoolSize", 0);
276    }
277
278    private static int getDefaultMaximumPoolSize() {
279        return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.maximumPoolSize", Integer.MAX_VALUE);
280    }
281
282    private static int getDefaultKeepAliveTime() {
283        return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.keepAliveTime", 30);
284    }
285}