001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.thread;
018    
019    import java.util.concurrent.Executor;
020    import java.util.concurrent.ExecutorService;
021    import java.util.concurrent.RejectedExecutionHandler;
022    import java.util.concurrent.SynchronousQueue;
023    import java.util.concurrent.ThreadFactory;
024    import java.util.concurrent.ThreadPoolExecutor;
025    import java.util.concurrent.TimeUnit;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    import java.util.concurrent.atomic.AtomicLong;
028    
029    import org.apache.activemq.util.ThreadPoolUtils;
030    import org.slf4j.Logger;
031    import org.slf4j.LoggerFactory;
032    
033    /**
034     * Manages the thread pool for long running tasks. Long running tasks are not
035     * always active but when they are active, they may need a few iterations of
036     * processing for them to become idle. The manager ensures that each task is
037     * processes but that no one task overtakes the system. This is kinda like
038     * cooperative multitasking.
039     *
040     * @org.apache.xbean.XBean
041     */
042    public class TaskRunnerFactory implements Executor {
043    
044        private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerFactory.class);
045        private ExecutorService executor;
046        private int maxIterationsPerRun;
047        private String name;
048        private int priority;
049        private boolean daemon;
050        private final AtomicLong id = new AtomicLong(0);
051        private boolean dedicatedTaskRunner;
052        private long shutdownAwaitTermination = 30000;
053        private final AtomicBoolean initDone = new AtomicBoolean(false);
054        private int maxThreadPoolSize = Integer.MAX_VALUE;
055        private RejectedExecutionHandler rejectedTaskHandler = null;
056    
057        public TaskRunnerFactory() {
058            this("ActiveMQ Task");
059        }
060    
061        public TaskRunnerFactory(String name) {
062            this(name, Thread.NORM_PRIORITY, true, 1000);
063        }
064    
065        private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) {
066            this(name,priority,daemon,maxIterationsPerRun,false);
067        }
068    
069        public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) {
070            this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, Integer.MAX_VALUE);
071        }
072    
073        public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) {
074            this.name = name;
075            this.priority = priority;
076            this.daemon = daemon;
077            this.maxIterationsPerRun = maxIterationsPerRun;
078            this.dedicatedTaskRunner = dedicatedTaskRunner;
079            this.maxThreadPoolSize = maxThreadPoolSize;
080        }
081    
082        public void init() {
083            if (initDone.compareAndSet(false, true)) {
084                // If your OS/JVM combination has a good thread model, you may want to
085                // avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead.
086                if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) {
087                    executor = null;
088                } else if (executor == null) {
089                    executor = createDefaultExecutor();
090                }
091                LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executor);
092            }
093        }
094    
095        /**
096         * Performs a shutdown only, by which the thread pool is shutdown by not graceful nor aggressively.
097         *
098         * @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService)
099         */
100        public void shutdown() {
101            if (executor != null) {
102                ThreadPoolUtils.shutdown(executor);
103                executor = null;
104            }
105            initDone.set(false);
106        }
107    
108        /**
109         * Performs a shutdown now (aggressively) on the thread pool.
110         *
111         * @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService)
112         */
113        public void shutdownNow() {
114            if (executor != null) {
115                ThreadPoolUtils.shutdownNow(executor);
116                executor = null;
117            }
118            initDone.set(false);
119        }
120    
121        /**
122         * Performs a graceful shutdown.
123         *
124         * @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService)
125         */
126        public void shutdownGraceful() {
127            if (executor != null) {
128                ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination);
129                executor = null;
130            }
131            initDone.set(false);
132        }
133    
134        public TaskRunner createTaskRunner(Task task, String name) {
135            init();
136            if (executor != null) {
137                return new PooledTaskRunner(executor, task, maxIterationsPerRun);
138            } else {
139                return new DedicatedTaskRunner(task, name, priority, daemon);
140            }
141        }
142    
143        @Override
144        public void execute(Runnable runnable) {
145            execute(runnable, name);
146        }
147    
148        public void execute(Runnable runnable, String name) {
149            init();
150            LOG.trace("Execute[{}] runnable: {}", name, runnable);
151            if (executor != null) {
152                executor.execute(runnable);
153            } else {
154                doExecuteNewThread(runnable, name);
155            }
156        }
157    
158        private void doExecuteNewThread(Runnable runnable, String name) {
159            String threadName = name + "-" + id.incrementAndGet();
160            Thread thread = new Thread(runnable, threadName);
161            thread.setDaemon(daemon);
162    
163            LOG.trace("Created and running thread[{}]: {}", threadName, thread);
164            thread.start();
165        }
166    
167        protected ExecutorService createDefaultExecutor() {
168            ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
169                @Override
170                public Thread newThread(Runnable runnable) {
171                    String threadName = name + "-" + id.incrementAndGet();
172                    Thread thread = new Thread(runnable, threadName);
173                    thread.setDaemon(daemon);
174                    thread.setPriority(priority);
175    
176                    LOG.trace("Created thread[{}]: {}", threadName, thread);
177                    return thread;
178                }
179            });
180            if (rejectedTaskHandler != null) {
181                rc.setRejectedExecutionHandler(rejectedTaskHandler);
182            }
183            return rc;
184        }
185    
186        public ExecutorService getExecutor() {
187            return executor;
188        }
189    
190        public void setExecutor(ExecutorService executor) {
191            this.executor = executor;
192        }
193    
194        public int getMaxIterationsPerRun() {
195            return maxIterationsPerRun;
196        }
197    
198        public void setMaxIterationsPerRun(int maxIterationsPerRun) {
199            this.maxIterationsPerRun = maxIterationsPerRun;
200        }
201    
202        public String getName() {
203            return name;
204        }
205    
206        public void setName(String name) {
207            this.name = name;
208        }
209    
210        public int getPriority() {
211            return priority;
212        }
213    
214        public void setPriority(int priority) {
215            this.priority = priority;
216        }
217    
218        public boolean isDaemon() {
219            return daemon;
220        }
221    
222        public void setDaemon(boolean daemon) {
223            this.daemon = daemon;
224        }
225    
226        public boolean isDedicatedTaskRunner() {
227            return dedicatedTaskRunner;
228        }
229    
230        public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
231            this.dedicatedTaskRunner = dedicatedTaskRunner;
232        }
233    
234        public int getMaxThreadPoolSize() {
235            return maxThreadPoolSize;
236        }
237    
238        public void setMaxThreadPoolSize(int maxThreadPoolSize) {
239            this.maxThreadPoolSize = maxThreadPoolSize;
240        }
241    
242        public RejectedExecutionHandler getRejectedTaskHandler() {
243            return rejectedTaskHandler;
244        }
245    
246        public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
247            this.rejectedTaskHandler = rejectedTaskHandler;
248        }
249    
250        public long getShutdownAwaitTermination() {
251            return shutdownAwaitTermination;
252        }
253    
254        public void setShutdownAwaitTermination(long shutdownAwaitTermination) {
255            this.shutdownAwaitTermination = shutdownAwaitTermination;
256        }
257    
258        private static int getDefaultKeepAliveTime() {
259            return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.keepAliveTime", 30);
260        }
261    }