001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.util;
018
019 import java.util.List;
020 import java.util.concurrent.ExecutorService;
021 import java.util.concurrent.TimeUnit;
022
023 import org.slf4j.Logger;
024 import org.slf4j.LoggerFactory;
025
026 /**
027 * Utility methods for working with thread pools {@link ExecutorService}.
028 */
029 public final class ThreadPoolUtils {
030
031 private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolUtils.class);
032
033 public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 10 * 1000L;
034
035 /**
036 * Shutdown the given executor service only (ie not graceful shutdown).
037 *
038 * @see java.util.concurrent.ExecutorService#shutdown()
039 */
040 public static void shutdown(ExecutorService executorService) {
041 doShutdown(executorService, 0);
042 }
043
044 /**
045 * Shutdown now the given executor service aggressively.
046 *
047 * @param executorService the executor service to shutdown now
048 * @return list of tasks that never commenced execution
049 * @see java.util.concurrent.ExecutorService#shutdownNow()
050 */
051 public static List<Runnable> shutdownNow(ExecutorService executorService) {
052 List<Runnable> answer = null;
053 if (!executorService.isShutdown()) {
054 LOG.debug("Forcing shutdown of ExecutorService: {}", executorService);
055 answer = executorService.shutdownNow();
056 if (LOG.isTraceEnabled()) {
057 LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
058 new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
059 }
060 }
061
062 return answer;
063 }
064
065 /**
066 * Shutdown the given executor service graceful at first, and then aggressively
067 * if the await termination timeout was hit.
068 * <p/>
069 * This implementation invokes the {@link #shutdownGraceful(java.util.concurrent.ExecutorService, long)}
070 * with a timeout value of {@link #DEFAULT_SHUTDOWN_AWAIT_TERMINATION} millis.
071 */
072 public static void shutdownGraceful(ExecutorService executorService) {
073 doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION);
074 }
075
076 /**
077 * Shutdown the given executor service graceful at first, and then aggressively
078 * if the await termination timeout was hit.
079 * <p/>
080 * Will try to perform an orderly shutdown by giving the running threads
081 * time to complete tasks, before going more aggressively by doing a
082 * {@link #shutdownNow(java.util.concurrent.ExecutorService)} which
083 * forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
084 * is used as timeout value waiting for orderly shutdown to
085 * complete normally, before going aggressively.
086 *
087 * @param executorService the executor service to shutdown
088 * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown
089 */
090 public static void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) {
091 doShutdown(executorService, shutdownAwaitTermination);
092 }
093
094 private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination) {
095 // code from Apache Camel - org.apache.camel.impl.DefaultExecutorServiceManager
096
097 if (executorService == null) {
098 return;
099 }
100
101 // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively
102 // and try shutting down again. In both cases we wait at most the given shutdown timeout value given
103 // (total wait could then be 2 x shutdownAwaitTermination, but when we shutdown the 2nd time we are aggressive and thus
104 // we ought to shutdown much faster)
105 if (!executorService.isShutdown()) {
106 boolean warned = false;
107 StopWatch watch = new StopWatch();
108
109 LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination);
110 executorService.shutdown();
111
112 if (shutdownAwaitTermination > 0) {
113 try {
114 if (!awaitTermination(executorService, shutdownAwaitTermination)) {
115 warned = true;
116 LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService);
117 executorService.shutdownNow();
118 // we are now shutting down aggressively, so wait to see if we can completely shutdown or not
119 if (!awaitTermination(executorService, shutdownAwaitTermination)) {
120 LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService);
121 }
122 }
123 } catch (InterruptedException e) {
124 warned = true;
125 LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
126 // we were interrupted during shutdown, so force shutdown
127 executorService.shutdownNow();
128 }
129 }
130
131 // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log
132 if (warned) {
133 LOG.info("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
134 new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())});
135 } else if (LOG.isDebugEnabled()) {
136 LOG.debug("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
137 new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())});
138 }
139 }
140 }
141
142 /**
143 * Awaits the termination of the thread pool.
144 * <p/>
145 * This implementation will log every 2nd second at INFO level that we are waiting, so the end user
146 * can see we are not hanging in case it takes longer time to terminate the pool.
147 *
148 * @param executorService the thread pool
149 * @param shutdownAwaitTermination time in millis to use as timeout
150 * @return <tt>true</tt> if the pool is terminated, or <tt>false</tt> if we timed out
151 * @throws InterruptedException is thrown if we are interrupted during the waiting
152 */
153 public static boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException {
154 // log progress every 5th second so end user is aware of we are shutting down
155 StopWatch watch = new StopWatch();
156 long interval = Math.min(2000, shutdownAwaitTermination);
157 boolean done = false;
158 while (!done && interval > 0) {
159 if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) {
160 done = true;
161 } else {
162 LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService);
163 // recalculate interval
164 interval = Math.min(2000, shutdownAwaitTermination - watch.taken());
165 }
166 }
167
168 return done;
169 }
170 }