001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import org.apache.activemq.store.PersistenceAdapter;
020 import org.apache.activemq.util.ServiceStopper;
021 import org.apache.activemq.util.ServiceSupport;
022 import org.apache.activemq.util.ThreadPoolUtils;
023 import org.slf4j.Logger;
024 import org.slf4j.LoggerFactory;
025
026 import java.io.IOException;
027 import java.util.concurrent.ScheduledFuture;
028 import java.util.concurrent.ScheduledThreadPoolExecutor;
029 import java.util.concurrent.ThreadFactory;
030 import java.util.concurrent.TimeUnit;
031
032 /**
033 * Helper class for working with services that requires locking
034 */
035 public abstract class LockableServiceSupport extends ServiceSupport implements Lockable, BrokerServiceAware {
036
037 private static final Logger LOG = LoggerFactory.getLogger(LockableServiceSupport.class);
038 boolean useLock = true;
039 Locker locker;
040 long lockKeepAlivePeriod = 0;
041 private ScheduledFuture<?> keepAliveTicket;
042 private ScheduledThreadPoolExecutor clockDaemon;
043 private BrokerService brokerService;
044
045 /**
046 * Initialize resources before locking
047 *
048 * @throws Exception
049 */
050 abstract public void init() throws Exception;
051
052 @Override
053 public void setUseLock(boolean useLock) {
054 this.useLock = useLock;
055 }
056
057 @Override
058 public void setLocker(Locker locker) throws IOException {
059 this.locker = locker;
060 if (this instanceof PersistenceAdapter) {
061 this.locker.configure((PersistenceAdapter)this);
062 }
063 }
064
065 public Locker getLocker() throws IOException {
066 if (this.locker == null) {
067 this.locker = createDefaultLocker();
068 }
069 return this.locker;
070 }
071
072 @Override
073 public void setLockKeepAlivePeriod(long lockKeepAlivePeriod) {
074 this.lockKeepAlivePeriod = lockKeepAlivePeriod;
075 }
076
077 @Override
078 public void preStart() throws Exception {
079 init();
080 if (useLock) {
081 if (getLocker() == null) {
082 LOG.warn("No locker configured");
083 } else {
084 getLocker().start();
085 if (lockKeepAlivePeriod > 0) {
086 keepAliveTicket = getScheduledThreadPoolExecutor().scheduleAtFixedRate(new Runnable() {
087 public void run() {
088 keepLockAlive();
089 }
090 }, lockKeepAlivePeriod, lockKeepAlivePeriod, TimeUnit.MILLISECONDS);
091 }
092 if (brokerService != null) {
093 brokerService.getBroker().nowMasterBroker();
094 }
095 }
096 }
097 }
098
099 @Override
100 public void postStop(ServiceStopper stopper) throws Exception {
101 if (useLock) {
102 if (keepAliveTicket != null) {
103 keepAliveTicket.cancel(false);
104 keepAliveTicket = null;
105 }
106 if (locker != null) {
107 getLocker().stop();
108 }
109 ThreadPoolUtils.shutdown(clockDaemon);
110 }
111 }
112
113 protected void keepLockAlive() {
114 boolean stop = false;
115 try {
116 Locker locker = getLocker();
117 if (locker != null) {
118 if (!locker.keepAlive()) {
119 stop = true;
120 }
121 }
122 } catch (IOException e) {
123 LOG.warn("locker keepalive resulted in: " + e, e);
124 }
125 if (stop) {
126 stopBroker();
127 }
128 }
129
130 protected void stopBroker() {
131 // we can no longer keep the lock so lets fail
132 LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
133 try {
134 brokerService.stop();
135 } catch (Exception e) {
136 LOG.warn("Failure occurred while stopping broker");
137 }
138 }
139
140 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
141 if (clockDaemon == null) {
142 clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
143 public Thread newThread(Runnable runnable) {
144 Thread thread = new Thread(runnable, "ActiveMQ Lock KeepAlive Timer");
145 thread.setDaemon(true);
146 return thread;
147 }
148 });
149 }
150 return clockDaemon;
151 }
152
153 @Override
154 public void setBrokerService(BrokerService brokerService) {
155 this.brokerService = brokerService;
156 }
157 }