001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import org.apache.activemq.store.PersistenceAdapter;
020    import org.apache.activemq.util.ServiceStopper;
021    import org.apache.activemq.util.ServiceSupport;
022    import org.apache.activemq.util.ThreadPoolUtils;
023    import org.slf4j.Logger;
024    import org.slf4j.LoggerFactory;
025    
026    import java.io.IOException;
027    import java.util.concurrent.ScheduledFuture;
028    import java.util.concurrent.ScheduledThreadPoolExecutor;
029    import java.util.concurrent.ThreadFactory;
030    import java.util.concurrent.TimeUnit;
031    
032    /**
033     * Helper class for working with services that requires locking
034     */
035    public abstract class LockableServiceSupport extends ServiceSupport implements Lockable, BrokerServiceAware {
036    
037        private static final Logger LOG = LoggerFactory.getLogger(LockableServiceSupport.class);
038        boolean useLock = true;
039        Locker locker;
040        long lockKeepAlivePeriod = 0;
041        private ScheduledFuture<?> keepAliveTicket;
042        private ScheduledThreadPoolExecutor clockDaemon;
043        protected BrokerService brokerService;
044    
045        /**
046         * Initialize resources before locking
047         *
048         * @throws Exception
049         */
050        abstract public void init() throws Exception;
051    
052        @Override
053        public void setUseLock(boolean useLock) {
054            this.useLock = useLock;
055        }
056    
057        @Override
058        public void setLocker(Locker locker) throws IOException {
059            this.locker = locker;
060            if (this instanceof PersistenceAdapter) {
061                this.locker.configure((PersistenceAdapter)this);
062            }
063        }
064    
065        public Locker getLocker() throws IOException {
066            if (this.locker == null) {
067                this.locker = createDefaultLocker();
068            }
069            return this.locker;
070        }
071    
072        @Override
073        public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
074            this.lockKeepAlivePeriod = lockKeepAlivePeriod;
075        }
076    
077        @Override
078        public void preStart() throws Exception {
079            init();
080            if (useLock) {
081                if (getLocker() == null) {
082                    LOG.warn("No locker configured");
083                } else {
084                    getLocker().start();
085                    if (lockKeepAlivePeriod > 0) {
086                        keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
087                            public void run() {
088                                keepLockAlive();
089                            }
090                        }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
091                    }
092                }
093            }
094        }
095    
096        @Override
097        public void postStop(ServiceStopper stopper) throws Exception {
098            if (useLock) {
099                if (keepAliveTicket != null) {
100                    keepAliveTicket.cancel(false);
101                    keepAliveTicket = null;
102                }
103                if (locker != null) {
104                    getLocker().stop();
105                }
106                ThreadPoolUtils.shutdown(clockDaemon);
107            }
108        }
109    
110        protected void keepLockAlive() {
111            boolean stop = false;
112            try {
113                Locker locker = getLocker();
114                if (locker != null) {
115                    if (!locker.keepAlive()) {
116                        stop = true;
117                    }
118                }
119            } catch (IOException e) {
120                LOG.warn("locker keepalive resulted in: " + e, e);
121            }
122            if (stop) {
123                stopBroker();
124            }
125        }
126    
127        protected void stopBroker() {
128            // we can no longer keep the lock so lets fail
129            LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
130            try {
131                brokerService.stop();
132            } catch (Exception e) {
133                LOG.warn("Failure occurred while stopping broker");
134            }
135        }
136    
137        public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
138            if (clockDaemon == null) {
139                clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
140                    public Thread newThread(Runnable runnable) {
141                        Thread thread = new Thread(runnable, "ActiveMQ Lock KeepAlive Timer");
142                        thread.setDaemon(true);
143                        return thread;
144                    }
145                });
146            }
147            return clockDaemon;
148        }
149    
150        @Override
151        public void setBrokerService(BrokerService brokerService) {
152            this.brokerService = brokerService;
153        }
154    }