001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import org.apache.activemq.store.PersistenceAdapter;
020    import org.apache.activemq.util.ServiceStopper;
021    import org.apache.activemq.util.ServiceSupport;
022    import org.apache.activemq.util.ThreadPoolUtils;
023    import org.slf4j.Logger;
024    import org.slf4j.LoggerFactory;
025    
026    import java.io.IOException;
027    import java.util.concurrent.ScheduledFuture;
028    import java.util.concurrent.ScheduledThreadPoolExecutor;
029    import java.util.concurrent.ThreadFactory;
030    import java.util.concurrent.TimeUnit;
031    
032    /**
033     * Helper class for working with services that requires locking
034     */
035    public abstract class LockableServiceSupport extends ServiceSupport implements Lockable, BrokerServiceAware {
036    
037        private static final Logger LOG = LoggerFactory.getLogger(LockableServiceSupport.class);
038        boolean useLock = true;
039        Locker locker;
040        long lockKeepAlivePeriod = 0;
041        private ScheduledFuture<?> keepAliveTicket;
042        private ScheduledThreadPoolExecutor clockDaemon;
043        private BrokerService brokerService;
044    
045        /**
046         * Initialize resources before locking
047         *
048         * @throws Exception
049         */
050        abstract public void init() throws Exception;
051    
052        @Override
053        public void setUseLock(boolean useLock) {
054            this.useLock = useLock;
055        }
056    
057        @Override
058        public void setLocker(Locker locker) throws IOException {
059            this.locker = locker;
060            if (this instanceof PersistenceAdapter) {
061                this.locker.configure((PersistenceAdapter)this);
062            }
063        }
064    
065        public Locker getLocker() throws IOException {
066            if (this.locker == null) {
067                this.locker = createDefaultLocker();
068            }
069            return this.locker;
070        }
071    
072        @Override
073        public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
074            this.lockKeepAlivePeriod = lockKeepAlivePeriod;
075        }
076    
077        @Override
078        public void preStart() throws Exception {
079            init();
080            if (useLock) {
081                if (getLocker() == null) {
082                    LOG.warn("No locker configured");
083                } else {
084                    getLocker().start();
085                    if (lockKeepAlivePeriod > 0) {
086                        keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
087                            public void run() {
088                                keepLockAlive();
089                            }
090                        }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
091                    }
092                    if (brokerService != null) {
093                        brokerService.getBroker().nowMasterBroker();
094                    }
095                }
096            }
097        }
098    
099        @Override
100        public void postStop(ServiceStopper stopper) throws Exception {
101            if (useLock) {
102                if (keepAliveTicket != null) {
103                    keepAliveTicket.cancel(false);
104                    keepAliveTicket = null;
105                }
106                if (locker != null) {
107                    getLocker().stop();
108                }
109                ThreadPoolUtils.shutdown(clockDaemon);
110            }
111        }
112    
113        protected void keepLockAlive() {
114            boolean stop = false;
115            try {
116                Locker locker = getLocker();
117                if (locker != null) {
118                    if (!locker.keepAlive()) {
119                        stop = true;
120                    }
121                }
122            } catch (IOException e) {
123                LOG.warn("locker keepalive resulted in: " + e, e);
124            }
125            if (stop) {
126                stopBroker();
127            }
128        }
129    
130        protected void stopBroker() {
131            // we can no longer keep the lock so lets fail
132            LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
133            try {
134                brokerService.stop();
135            } catch (Exception e) {
136                LOG.warn("Failure occurred while stopping broker");
137            }
138        }
139    
140        public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
141            if (clockDaemon == null) {
142                clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
143                    public Thread newThread(Runnable runnable) {
144                        Thread thread = new Thread(runnable, "ActiveMQ Lock KeepAlive Timer");
145                        thread.setDaemon(true);
146                        return thread;
147                    }
148                });
149            }
150            return clockDaemon;
151        }
152    
153        @Override
154        public void setBrokerService(BrokerService brokerService) {
155            this.brokerService = brokerService;
156        }
157    }