001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import org.apache.activemq.store.PersistenceAdapter;
020import org.apache.activemq.util.ServiceStopper;
021import org.apache.activemq.util.ServiceSupport;
022import org.apache.activemq.util.ThreadPoolUtils;
023import org.slf4j.Logger;
024import org.slf4j.LoggerFactory;
025
026import java.io.IOException;
027import java.util.concurrent.ScheduledFuture;
028import java.util.concurrent.ScheduledThreadPoolExecutor;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.TimeUnit;
031
032/**
033 * Helper class for working with services that requires locking
034 */
035public abstract class LockableServiceSupport extends ServiceSupport implements Lockable, BrokerServiceAware {
036
037    private static final Logger LOG = LoggerFactory.getLogger(LockableServiceSupport.class);
038    boolean useLock = true;
039    Locker locker;
040    long lockKeepAlivePeriod = 0;
041    private ScheduledFuture<?> keepAliveTicket;
042    protected ScheduledThreadPoolExecutor clockDaemon;
043    protected BrokerService brokerService;
044
045    /**
046     * Initialize resources before locking
047     *
048     * @throws Exception
049     */
050    abstract public void init() throws Exception;
051
052    @Override
053    public void setUseLock(boolean useLock) {
054        this.useLock = useLock;
055    }
056
057    public boolean isUseLock() {
058        return this.useLock;
059    }
060
061    @Override
062    public void setLocker(Locker locker) throws IOException {
063        this.locker = locker;
064        locker.setLockable(this);
065        if (this instanceof PersistenceAdapter) {
066            this.locker.configure((PersistenceAdapter)this);
067        }
068    }
069
070    public Locker getLocker() throws IOException {
071        if (this.locker == null) {
072            setLocker(createDefaultLocker());
073        }
074        return this.locker;
075    }
076
077    @Override
078    public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
079        this.lockKeepAlivePeriod = lockKeepAlivePeriod;
080    }
081
082    @Override
083    public long getLockKeepAlivePeriod() {
084        return lockKeepAlivePeriod;
085    }
086
087    @Override
088    public void preStart() throws Exception {
089        init();
090        if (useLock) {
091            if (getLocker() == null) {
092                LOG.warn("No locker configured");
093            } else {
094                getLocker().start();
095                if (lockKeepAlivePeriod > 0) {
096                    keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
097                        public void run() {
098                            keepLockAlive();
099                        }
100                    }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
101                }
102            }
103        }
104    }
105
106    @Override
107    public void postStop(ServiceStopper stopper) throws Exception {
108        if (useLock) {
109            if (keepAliveTicket != null) {
110                keepAliveTicket.cancel(false);
111                keepAliveTicket = null;
112            }
113            if (locker != null) {
114                getLocker().stop();
115                locker = null;
116            }
117        }
118        ThreadPoolUtils.shutdown(clockDaemon);
119        clockDaemon = null;
120    }
121
122    protected void keepLockAlive() {
123        boolean stop = false;
124        try {
125            Locker locker = getLocker();
126            if (locker != null) {
127                if (!locker.keepAlive()) {
128                    stop = true;
129                }
130            }
131        } catch (SuppressReplyException e) {
132            LOG.warn("locker keepAlive resulted in", e);
133        } catch (IOException e) {
134            LOG.warn("locker keepAlive resulted in", e);
135        }
136        if (stop) {
137            stopBroker();
138        }
139    }
140
141    protected void stopBroker() {
142        // we can no longer keep the lock so lets fail
143        LOG.error("{}, no longer able to keep the exclusive lock so giving up being a master", brokerService.getBrokerName());
144        try {
145            if( brokerService.isRestartAllowed() ) {
146                brokerService.requestRestart();
147            }
148            brokerService.stop();
149        } catch (Exception e) {
150            LOG.warn("Failure occurred while stopping broker");
151        }
152    }
153
154    public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
155        if (clockDaemon == null) {
156            clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
157                public Thread newThread(Runnable runnable) {
158                    Thread thread = new Thread(runnable, "ActiveMQ Lock KeepAlive Timer");
159                    thread.setDaemon(true);
160                    return thread;
161                }
162            });
163        }
164        return clockDaemon;
165    }
166
167    public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) {
168        this.clockDaemon = clockDaemon;
169    }
170
171    @Override
172    public void setBrokerService(BrokerService brokerService) {
173        this.brokerService = brokerService;
174    }
175
176    public BrokerService getBrokerService() {
177        return this.brokerService;
178    }
179}