001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.thread;
018    
019    import java.util.concurrent.Executor;
020    import java.util.concurrent.ExecutorService;
021    import java.util.concurrent.RejectedExecutionHandler;
022    import java.util.concurrent.SynchronousQueue;
023    import java.util.concurrent.ThreadFactory;
024    import java.util.concurrent.ThreadPoolExecutor;
025    import java.util.concurrent.TimeUnit;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    import java.util.concurrent.atomic.AtomicLong;
028    
029    import org.apache.activemq.util.ThreadPoolUtils;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     * Manages the thread pool for long running tasks. Long running tasks are not
035     * always active but when they are active, they may need a few iterations of
036     * processing for them to become idle. The manager ensures that each task is
037     * processes but that no one task overtakes the system. This is kinda like
038     * cooperative multitasking.
039     *
040     * @org.apache.xbean.XBean
041     */
042    public class TaskRunnerFactory implements Executor {
043    
044        private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerFactory.class);
045        private ExecutorService executor;
046        private int maxIterationsPerRun;
047        private String name;
048        private int priority;
049        private boolean daemon;
050        private AtomicLong id = new AtomicLong(0);
051        private boolean dedicatedTaskRunner;
052        private long shutdownAwaitTermination = 30000;
053        private AtomicBoolean initDone = new AtomicBoolean(false);
054        private int maxThreadPoolSize = Integer.MAX_VALUE;
055        private RejectedExecutionHandler rejectedTaskHandler = null;
056    
057        public TaskRunnerFactory() {
058            this("ActiveMQ Task");
059        }
060    
061        public TaskRunnerFactory(String name) {
062            this(name, Thread.NORM_PRIORITY, true, 1000);
063        }
064    
065        private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
066            this(name,priority,daemon,maxIterationsPerRun,false);
067        }
068    
069        public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) {
070            this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, Integer.MAX_VALUE);
071        }
072    
073        public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) {
074            this.name = name;
075            this.priority = priority;
076            this.daemon = daemon;
077            this.maxIterationsPerRun = maxIterationsPerRun;
078            this.dedicatedTaskRunner = dedicatedTaskRunner;
079            this.maxThreadPoolSize = maxThreadPoolSize;
080        }
081    
082        public void init() {
083            if (initDone.compareAndSet(false, true)) {
084                // If your OS/JVM combination has a good thread model, you may want to
085                // avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead.
086                if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
087                    executor = null;
088                } else if (executor == null) {
089                    executor = createDefaultExecutor();
090                }
091                LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executor);
092            }
093        }
094    
095        /**
096         * Performs a shutdown only, by which the thread pool is shutdown by not graceful nor aggressively.
097         *
098         * @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService)
099         */
100        public void shutdown() {
101            if (executor != null) {
102                ThreadPoolUtils.shutdown(executor);
103                executor = null;
104            }
105            initDone.set(false);
106        }
107    
108        /**
109         * Performs a shutdown now (aggressively) on the thread pool.
110         *
111         * @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService)
112         */
113        public void shutdownNow() {
114            if (executor != null) {
115                ThreadPoolUtils.shutdownNow(executor);
116                executor = null;
117            }
118            initDone.set(false);
119        }
120    
121        /**
122         * Performs a graceful shutdown.
123         *
124         * @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService)
125         */
126        public void shutdownGraceful() {
127            if (executor != null) {
128                ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination);
129                executor = null;
130            }
131            initDone.set(false);
132        }
133    
134        public TaskRunner createTaskRunner(Task task, String name) {
135            init();
136            if (executor != null) {
137                return new PooledTaskRunner(executor, task, maxIterationsPerRun);
138            } else {
139                return new DedicatedTaskRunner(task, name, priority, daemon);
140            }
141        }
142    
143        public void execute(Runnable runnable) {
144            execute(runnable, name);
145        }
146    
147        public void execute(Runnable runnable, String name) {
148            init();
149            LOG.trace("Execute[{}] runnable: {}", name, runnable);
150            if (executor != null) {
151                executor.execute(runnable);
152            } else {
153                doExecuteNewThread(runnable, name);
154            }
155        }
156    
157        private void doExecuteNewThread(Runnable runnable, String name) {
158            String threadName = name + "-" + id.incrementAndGet();
159            Thread thread = new Thread(runnable, threadName);
160            thread.setDaemon(daemon);
161    
162            LOG.trace("Created and running thread[{}]: {}", threadName, thread);
163            thread.start();
164        }
165    
166        protected ExecutorService createDefaultExecutor() {
167            ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
168                public Thread newThread(Runnable runnable) {
169                    String threadName = name + "-" + id.incrementAndGet();
170                    Thread thread = new Thread(runnable, threadName);
171                    thread.setDaemon(daemon);
172                    thread.setPriority(priority);
173    
174                    LOG.trace("Created thread[{}]: {}", threadName, thread);
175                    return thread;
176                }
177            });
178            if (rejectedTaskHandler != null) {
179                rc.setRejectedExecutionHandler(rejectedTaskHandler);
180            }
181            return rc;
182        }
183    
184        public ExecutorService getExecutor() {
185            return executor;
186        }
187    
188        public void setExecutor(ExecutorService executor) {
189            this.executor = executor;
190        }
191    
192        public int getMaxIterationsPerRun() {
193            return maxIterationsPerRun;
194        }
195    
196        public void setMaxIterationsPerRun(int maxIterationsPerRun) {
197            this.maxIterationsPerRun = maxIterationsPerRun;
198        }
199    
200        public String getName() {
201            return name;
202        }
203    
204        public void setName(String name) {
205            this.name = name;
206        }
207    
208        public int getPriority() {
209            return priority;
210        }
211    
212        public void setPriority(int priority) {
213            this.priority = priority;
214        }
215    
216        public boolean isDaemon() {
217            return daemon;
218        }
219    
220        public void setDaemon(boolean daemon) {
221            this.daemon = daemon;
222        }
223    
224        public boolean isDedicatedTaskRunner() {
225            return dedicatedTaskRunner;
226        }
227    
228        public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
229            this.dedicatedTaskRunner = dedicatedTaskRunner;
230        }
231    
232        public int getMaxThreadPoolSize() {
233            return maxThreadPoolSize;
234        }
235    
236        public void setMaxThreadPoolSize(int maxThreadPoolSize) {
237            this.maxThreadPoolSize = maxThreadPoolSize;
238        }
239    
240        public RejectedExecutionHandler getRejectedTaskHandler() {
241            return rejectedTaskHandler;
242        }
243    
244        public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
245            this.rejectedTaskHandler = rejectedTaskHandler;
246        }
247    
248        public long getShutdownAwaitTermination() {
249            return shutdownAwaitTermination;
250        }
251    
252        public void setShutdownAwaitTermination(long shutdownAwaitTermination) {
253            this.shutdownAwaitTermination = shutdownAwaitTermination;
254        }
255    
256    }