001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.util;
018    
019    import java.util.List;
020    import java.util.concurrent.ExecutorService;
021    import java.util.concurrent.TimeUnit;
022    
023    import org.slf4j.Logger;
024    import org.slf4j.LoggerFactory;
025    
026    /**
027     * Utility methods for working with thread pools {@link ExecutorService}.
028     */
029    public final class ThreadPoolUtils {
030    
031        private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolUtils.class);
032    
033        public static final long DEFAULT_SHUTDOWN_AWAIT_TERMINATION = 10 * 1000L;
034    
035        /**
036         * Shutdown the given executor service only (ie not graceful shutdown).
037         *
038         * @see java.util.concurrent.ExecutorService#shutdown()
039         */
040        public static void shutdown(ExecutorService executorService) {
041            doShutdown(executorService, 0);
042        }
043    
044        /**
045         * Shutdown now the given executor service aggressively.
046         *
047         * @param executorService the executor service to shutdown now
048         * @return list of tasks that never commenced execution
049         * @see java.util.concurrent.ExecutorService#shutdownNow()
050         */
051        public static List<Runnable> shutdownNow(ExecutorService executorService) {
052            List<Runnable> answer = null;
053            if (!executorService.isShutdown()) {
054                LOG.debug("Forcing shutdown of ExecutorService: {}", executorService);
055                answer = executorService.shutdownNow();
056                if (LOG.isTraceEnabled()) {
057                    LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
058                            new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
059                }
060            }
061    
062            return answer;
063        }
064    
065        /**
066         * Shutdown the given executor service graceful at first, and then aggressively
067         * if the await termination timeout was hit.
068         * <p/>
069         * This implementation invokes the {@link #shutdownGraceful(java.util.concurrent.ExecutorService, long)}
070         * with a timeout value of {@link #DEFAULT_SHUTDOWN_AWAIT_TERMINATION} millis.
071         */
072        public static void shutdownGraceful(ExecutorService executorService) {
073            doShutdown(executorService, DEFAULT_SHUTDOWN_AWAIT_TERMINATION);
074        }
075    
076        /**
077         * Shutdown the given executor service graceful at first, and then aggressively
078         * if the await termination timeout was hit.
079         * <p/>
080         * Will try to perform an orderly shutdown by giving the running threads
081         * time to complete tasks, before going more aggressively by doing a
082         * {@link #shutdownNow(java.util.concurrent.ExecutorService)} which
083         * forces a shutdown. The parameter <tt>shutdownAwaitTermination</tt>
084         * is used as timeout value waiting for orderly shutdown to
085         * complete normally, before going aggressively.
086         *
087         * @param executorService the executor service to shutdown
088         * @param shutdownAwaitTermination timeout in millis to wait for orderly shutdown
089         */
090        public static void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) {
091            doShutdown(executorService, shutdownAwaitTermination);
092        }
093    
094        private static void doShutdown(ExecutorService executorService, long shutdownAwaitTermination) {
095            // code from Apache Camel - org.apache.camel.impl.DefaultExecutorServiceManager
096    
097            if (executorService == null) {
098                return;
099            }
100    
101            // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively
102            // and try shutting down again. In both cases we wait at most the given shutdown timeout value given
103            // (total wait could then be 2 x shutdownAwaitTermination, but when we shutdown the 2nd time we are aggressive and thus
104            // we ought to shutdown much faster)
105            if (!executorService.isShutdown()) {
106                boolean warned = false;
107                StopWatch watch = new StopWatch();
108    
109                LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination);
110                executorService.shutdown();
111    
112                if (shutdownAwaitTermination > 0) {
113                    try {
114                        if (!awaitTermination(executorService, shutdownAwaitTermination)) {
115                            warned = true;
116                            LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService);
117                            executorService.shutdownNow();
118                            // we are now shutting down aggressively, so wait to see if we can completely shutdown or not
119                            if (!awaitTermination(executorService, shutdownAwaitTermination)) {
120                                LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService);
121                            }
122                        }
123                    } catch (InterruptedException e) {
124                        warned = true;
125                        LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
126                        // we were interrupted during shutdown, so force shutdown
127                        executorService.shutdownNow();
128                    }
129                }
130    
131                // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log
132                if (warned) {
133                    LOG.info("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
134                            new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())});
135                } else if (LOG.isDebugEnabled()) {
136                    LOG.debug("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
137                            new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())});
138                }
139            }
140        }
141    
142        /**
143         * Awaits the termination of the thread pool.
144         * <p/>
145         * This implementation will log every 2nd second at INFO level that we are waiting, so the end user
146         * can see we are not hanging in case it takes longer time to terminate the pool.
147         *
148         * @param executorService            the thread pool
149         * @param shutdownAwaitTermination   time in millis to use as timeout
150         * @return <tt>true</tt> if the pool is terminated, or <tt>false</tt> if we timed out
151         * @throws InterruptedException is thrown if we are interrupted during the waiting
152         */
153        public static boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException {
154            // log progress every 5th second so end user is aware of we are shutting down
155            StopWatch watch = new StopWatch();
156            long interval = Math.min(2000, shutdownAwaitTermination);
157            boolean done = false;
158            while (!done && interval > 0) {
159                if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) {
160                    done = true;
161                } else {
162                    LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService);
163                    // recalculate interval
164                    interval = Math.min(2000, shutdownAwaitTermination - watch.taken());
165                }
166            }
167    
168            return done;
169        }
170    }