001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.thread;
018
019 import java.util.concurrent.Executor;
020 import java.util.concurrent.ExecutorService;
021 import java.util.concurrent.RejectedExecutionHandler;
022 import java.util.concurrent.SynchronousQueue;
023 import java.util.concurrent.ThreadFactory;
024 import java.util.concurrent.ThreadPoolExecutor;
025 import java.util.concurrent.TimeUnit;
026 import java.util.concurrent.atomic.AtomicBoolean;
027 import java.util.concurrent.atomic.AtomicLong;
028
029 import org.apache.activemq.util.ThreadPoolUtils;
030 import org.slf4j.Logger;
031 import org.slf4j.LoggerFactory;
032
033 /**
034 * Manages the thread pool for long running tasks. Long running tasks are not
035 * always active but when they are active, they may need a few iterations of
036 * processing for them to become idle. The manager ensures that each task is
037 * processes but that no one task overtakes the system. This is kinda like
038 * cooperative multitasking.
039 *
040 * @org.apache.xbean.XBean
041 */
042 public class TaskRunnerFactory implements Executor {
043
044 private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerFactory.class);
045 private ExecutorService executor;
046 private int maxIterationsPerRun;
047 private String name;
048 private int priority;
049 private boolean daemon;
050 private AtomicLong id = new AtomicLong(0);
051 private boolean dedicatedTaskRunner;
052 private long shutdownAwaitTermination = 30000;
053 private AtomicBoolean initDone = new AtomicBoolean(false);
054 private int maxThreadPoolSize = Integer.MAX_VALUE;
055 private RejectedExecutionHandler rejectedTaskHandler = null;
056
057 public TaskRunnerFactory() {
058 this("ActiveMQ Task");
059 }
060
061 public TaskRunnerFactory(String name) {
062 this(name, Thread.NORM_PRIORITY, true, 1000);
063 }
064
065 private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
066 this(name,priority,daemon,maxIterationsPerRun,false);
067 }
068
069 public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) {
070 this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, Integer.MAX_VALUE);
071 }
072
073 public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) {
074 this.name = name;
075 this.priority = priority;
076 this.daemon = daemon;
077 this.maxIterationsPerRun = maxIterationsPerRun;
078 this.dedicatedTaskRunner = dedicatedTaskRunner;
079 this.maxThreadPoolSize = maxThreadPoolSize;
080 }
081
082 public void init() {
083 if (initDone.compareAndSet(false, true)) {
084 // If your OS/JVM combination has a good thread model, you may want to
085 // avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead.
086 if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
087 executor = null;
088 } else if (executor == null) {
089 executor = createDefaultExecutor();
090 }
091 LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executor);
092 }
093 }
094
095 /**
096 * Performs a shutdown only, by which the thread pool is shutdown by not graceful nor aggressively.
097 *
098 * @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService)
099 */
100 public void shutdown() {
101 if (executor != null) {
102 ThreadPoolUtils.shutdown(executor);
103 executor = null;
104 }
105 initDone.set(false);
106 }
107
108 /**
109 * Performs a shutdown now (aggressively) on the thread pool.
110 *
111 * @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService)
112 */
113 public void shutdownNow() {
114 if (executor != null) {
115 ThreadPoolUtils.shutdownNow(executor);
116 executor = null;
117 }
118 initDone.set(false);
119 }
120
121 /**
122 * Performs a graceful shutdown.
123 *
124 * @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService)
125 */
126 public void shutdownGraceful() {
127 if (executor != null) {
128 ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination);
129 executor = null;
130 }
131 initDone.set(false);
132 }
133
134 public TaskRunner createTaskRunner(Task task, String name) {
135 init();
136 if (executor != null) {
137 return new PooledTaskRunner(executor, task, maxIterationsPerRun);
138 } else {
139 return new DedicatedTaskRunner(task, name, priority, daemon);
140 }
141 }
142
143 public void execute(Runnable runnable) {
144 execute(runnable, name);
145 }
146
147 public void execute(Runnable runnable, String name) {
148 init();
149 LOG.trace("Execute[{}] runnable: {}", name, runnable);
150 if (executor != null) {
151 executor.execute(runnable);
152 } else {
153 doExecuteNewThread(runnable, name);
154 }
155 }
156
157 private void doExecuteNewThread(Runnable runnable, String name) {
158 String threadName = name + "-" + id.incrementAndGet();
159 Thread thread = new Thread(runnable, threadName);
160 thread.setDaemon(daemon);
161
162 LOG.trace("Created and running thread[{}]: {}", threadName, thread);
163 thread.start();
164 }
165
166 protected ExecutorService createDefaultExecutor() {
167 ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
168 public Thread newThread(Runnable runnable) {
169 String threadName = name + "-" + id.incrementAndGet();
170 Thread thread = new Thread(runnable, threadName);
171 thread.setDaemon(daemon);
172 thread.setPriority(priority);
173
174 LOG.trace("Created thread[{}]: {}", threadName, thread);
175 return thread;
176 }
177 });
178 if (rejectedTaskHandler != null) {
179 rc.setRejectedExecutionHandler(rejectedTaskHandler);
180 }
181 return rc;
182 }
183
184 public ExecutorService getExecutor() {
185 return executor;
186 }
187
188 public void setExecutor(ExecutorService executor) {
189 this.executor = executor;
190 }
191
192 public int getMaxIterationsPerRun() {
193 return maxIterationsPerRun;
194 }
195
196 public void setMaxIterationsPerRun(int maxIterationsPerRun) {
197 this.maxIterationsPerRun = maxIterationsPerRun;
198 }
199
200 public String getName() {
201 return name;
202 }
203
204 public void setName(String name) {
205 this.name = name;
206 }
207
208 public int getPriority() {
209 return priority;
210 }
211
212 public void setPriority(int priority) {
213 this.priority = priority;
214 }
215
216 public boolean isDaemon() {
217 return daemon;
218 }
219
220 public void setDaemon(boolean daemon) {
221 this.daemon = daemon;
222 }
223
224 public boolean isDedicatedTaskRunner() {
225 return dedicatedTaskRunner;
226 }
227
228 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
229 this.dedicatedTaskRunner = dedicatedTaskRunner;
230 }
231
232 public int getMaxThreadPoolSize() {
233 return maxThreadPoolSize;
234 }
235
236 public void setMaxThreadPoolSize(int maxThreadPoolSize) {
237 this.maxThreadPoolSize = maxThreadPoolSize;
238 }
239
240 public RejectedExecutionHandler getRejectedTaskHandler() {
241 return rejectedTaskHandler;
242 }
243
244 public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
245 this.rejectedTaskHandler = rejectedTaskHandler;
246 }
247
248 public long getShutdownAwaitTermination() {
249 return shutdownAwaitTermination;
250 }
251
252 public void setShutdownAwaitTermination(long shutdownAwaitTermination) {
253 this.shutdownAwaitTermination = shutdownAwaitTermination;
254 }
255
256 }