001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.IOException;
020 import java.sql.Connection;
021 import java.sql.PreparedStatement;
022 import java.sql.ResultSet;
023 import java.sql.SQLException;
024 import java.sql.Timestamp;
025 import java.util.Date;
026 import java.util.concurrent.TimeUnit;
027 import javax.sql.DataSource;
028
029 import org.apache.activemq.broker.AbstractLocker;
030 import org.apache.activemq.store.PersistenceAdapter;
031 import org.apache.activemq.util.IOExceptionSupport;
032 import org.apache.activemq.util.ServiceStopper;
033 import org.slf4j.Logger;
034 import org.slf4j.LoggerFactory;
035
036 /**
037 * Represents an exclusive lease on a database to avoid multiple brokers running
038 * against the same logical database.
039 *
040 * @org.apache.xbean.XBean element="lease-database-locker"
041 *
042 */
043 public class LeaseDatabaseLocker extends AbstractLocker {
044 private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class);
045 protected DataSource dataSource;
046 protected Statements statements;
047
048 protected boolean stopping;
049 protected int maxAllowableDiffFromDBTime = 0;
050 protected long diffFromCurrentTime = Long.MAX_VALUE;
051 protected String leaseHolderId;
052 protected int queryTimeout = -1;
053 JDBCPersistenceAdapter persistenceAdapter;
054
055
056 public void configure(PersistenceAdapter adapter) throws IOException {
057 if (adapter instanceof JDBCPersistenceAdapter) {
058 this.persistenceAdapter = (JDBCPersistenceAdapter)adapter;
059 this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
060 this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
061 }
062 }
063
064 public void doStart() throws Exception {
065 stopping = false;
066
067 LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the Master broker");
068 String sql = statements.getLeaseObtainStatement();
069 LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
070
071 while (!stopping) {
072 Connection connection = null;
073 PreparedStatement statement = null;
074 try {
075 connection = getConnection();
076 initTimeDiff(connection);
077
078 statement = connection.prepareStatement(sql);
079 setQueryTimeout(statement);
080
081 final long now = System.currentTimeMillis() + diffFromCurrentTime;
082 statement.setString(1, getLeaseHolderId());
083 statement.setLong(2, now + lockAcquireSleepInterval);
084 statement.setLong(3, now);
085
086 int result = statement.executeUpdate();
087 if (result == 1) {
088 // we got the lease, verify we still have it
089 if (keepAlive()) {
090 break;
091 }
092 }
093
094 reportLeasOwnerShipAndDuration(connection);
095
096 } catch (Exception e) {
097 LOG.debug(getLeaseHolderId() + " lease acquire failure: "+ e, e);
098 } finally {
099 close(statement);
100 close(connection);
101 }
102
103 LOG.info(getLeaseHolderId() + " failed to acquire lease. Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
104 TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
105 }
106 if (stopping) {
107 throw new RuntimeException(getLeaseHolderId() + " failing lease acquire due to stop");
108 }
109
110 LOG.info(getLeaseHolderId() + ", becoming the master on dataSource: " + dataSource);
111 }
112
113 private void setQueryTimeout(PreparedStatement statement) throws SQLException {
114 if (queryTimeout > 0) {
115 statement.setQueryTimeout(queryTimeout);
116 }
117 }
118
119 private Connection getConnection() throws SQLException {
120 return dataSource.getConnection();
121 }
122
123 private void close(Connection connection) {
124 if (null != connection) {
125 try {
126 connection.close();
127 } catch (SQLException e1) {
128 LOG.debug(getLeaseHolderId() + " caught exception while closing connection: " + e1, e1);
129 }
130 }
131 }
132
133 private void close(PreparedStatement statement) {
134 if (null != statement) {
135 try {
136 statement.close();
137 } catch (SQLException e1) {
138 LOG.debug(getLeaseHolderId() + ", caught while closing statement: " + e1, e1);
139 }
140 }
141 }
142
143 private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException {
144 PreparedStatement statement = null;
145 try {
146 statement = connection.prepareStatement(statements.getLeaseOwnerStatement());
147 ResultSet resultSet = statement.executeQuery();
148 while (resultSet.next()) {
149 LOG.info(getLeaseHolderId() + " Lease held by " + resultSet.getString(1) + " till " + new Date(resultSet.getLong(2)));
150 }
151 } finally {
152 close(statement);
153 }
154 }
155
156 protected long initTimeDiff(Connection connection) throws SQLException {
157 if (Long.MAX_VALUE == diffFromCurrentTime) {
158 if (maxAllowableDiffFromDBTime > 0) {
159 diffFromCurrentTime = determineTimeDifference(connection);
160 } else {
161 diffFromCurrentTime = 0l;
162 }
163 }
164 return diffFromCurrentTime;
165 }
166
167 private long determineTimeDifference(Connection connection) throws SQLException {
168 PreparedStatement statement = connection.prepareStatement(statements.getCurrentDateTime());
169 ResultSet resultSet = statement.executeQuery();
170 long result = 0l;
171 if (resultSet.next()) {
172 Timestamp timestamp = resultSet.getTimestamp(1);
173 long diff = System.currentTimeMillis() - timestamp.getTime();
174 LOG.info(getLeaseHolderId() + " diff from db: " + diff + ", db time: " + timestamp);
175 if (diff > maxAllowableDiffFromDBTime || diff < -maxAllowableDiffFromDBTime) {
176 // off by more than maxAllowableDiffFromDBTime so lets adjust
177 result = diff;
178 }
179 }
180 return result;
181 }
182
183 public void doStop(ServiceStopper stopper) throws Exception {
184 releaseLease();
185 stopping = true;
186 }
187
188 private void releaseLease() {
189 Connection connection = null;
190 PreparedStatement statement = null;
191 try {
192 connection = getConnection();
193 statement = connection.prepareStatement(statements.getLeaseUpdateStatement());
194 statement.setString(1, null);
195 statement.setLong(2, 0l);
196 statement.setString(3, getLeaseHolderId());
197 if (statement.executeUpdate() == 1) {
198 LOG.info(getLeaseHolderId() + ", released lease");
199 }
200 } catch (Exception e) {
201 LOG.error(getLeaseHolderId() + " failed to release lease: " + e, e);
202 } finally {
203 close(statement);
204 close(connection);
205 }
206 }
207
208 @Override
209 public boolean keepAlive() throws IOException {
210 boolean result = false;
211 final String sql = statements.getLeaseUpdateStatement();
212 LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);
213
214 Connection connection = null;
215 PreparedStatement statement = null;
216 try {
217 connection = getConnection();
218
219 initTimeDiff(connection);
220 statement = connection.prepareStatement(sql);
221 setQueryTimeout(statement);
222
223 final long now = System.currentTimeMillis() + diffFromCurrentTime;
224 statement.setString(1, getLeaseHolderId());
225 statement.setLong(2, now + lockAcquireSleepInterval);
226 statement.setString(3, getLeaseHolderId());
227
228 result = (statement.executeUpdate() == 1);
229 } catch (Exception e) {
230 LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
231 IOException ioe = IOExceptionSupport.create(e);
232 persistenceAdapter.getBrokerService().handleIOException(ioe);
233 throw ioe;
234 } finally {
235 close(statement);
236 close(connection);
237 }
238 return result;
239 }
240
241 public long getLockAcquireSleepInterval() {
242 return lockAcquireSleepInterval;
243 }
244
245 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
246 this.lockAcquireSleepInterval = lockAcquireSleepInterval;
247 }
248
249 public int getQueryTimeout() {
250 return queryTimeout;
251 }
252
253 public void setQueryTimeout(int queryTimeout) {
254 this.queryTimeout = queryTimeout;
255 }
256
257 public String getLeaseHolderId() {
258 if (leaseHolderId == null) {
259 if (persistenceAdapter.getBrokerService() != null) {
260 leaseHolderId = persistenceAdapter.getBrokerService().getBrokerName();
261 }
262 }
263 return leaseHolderId;
264 }
265
266 public void setLeaseHolderId(String leaseHolderId) {
267 this.leaseHolderId = leaseHolderId;
268 }
269
270 public int getMaxAllowableDiffFromDBTime() {
271 return maxAllowableDiffFromDBTime;
272 }
273
274 public void setMaxAllowableDiffFromDBTime(int maxAllowableDiffFromDBTime) {
275 this.maxAllowableDiffFromDBTime = maxAllowableDiffFromDBTime;
276 }
277 }