001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.IOException;
020 import java.sql.Connection;
021 import java.sql.PreparedStatement;
022 import java.sql.SQLException;
023 import java.sql.SQLFeatureNotSupportedException;
024
025 import javax.sql.DataSource;
026
027 import org.apache.activemq.broker.AbstractLocker;
028 import org.apache.activemq.store.PersistenceAdapter;
029 import org.apache.activemq.util.Handler;
030 import org.apache.activemq.util.ServiceStopper;
031 import org.slf4j.Logger;
032 import org.slf4j.LoggerFactory;
033
034 /**
035 * Represents an exclusive lock on a database to avoid multiple brokers running
036 * against the same logical database.
037 *
038 * @org.apache.xbean.XBean element="database-locker"
039 *
040 */
041 public class DefaultDatabaseLocker extends AbstractLocker {
042 public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 1000;
043 private static final Logger LOG = LoggerFactory.getLogger(DefaultDatabaseLocker.class);
044 protected DataSource dataSource;
045 protected Statements statements;
046
047 protected volatile PreparedStatement lockCreateStatement;
048 protected volatile PreparedStatement lockUpdateStatement;
049 protected volatile Connection connection;
050 protected volatile boolean stopping;
051 protected Handler<Exception> exceptionHandler;
052 protected int queryTimeout = 10;
053
054 public void configure(PersistenceAdapter adapter) throws IOException {
055 if (adapter instanceof JDBCPersistenceAdapter) {
056 this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
057 this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
058 }
059 lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
060 }
061
062 public void doStart() throws Exception {
063 stopping = false;
064
065 LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
066 String sql = statements.getLockCreateStatement();
067 LOG.debug("Locking Query is "+sql);
068
069 while (true) {
070 try {
071 connection = dataSource.getConnection();
072 connection.setAutoCommit(false);
073 lockCreateStatement = connection.prepareStatement(sql);
074 lockCreateStatement.execute();
075 break;
076 } catch (Exception e) {
077 try {
078 if (stopping) {
079 throw new Exception(
080 "Cannot start broker as being asked to shut down. "
081 + "Interrupted attempt to acquire lock: "
082 + e, e);
083 }
084 if (exceptionHandler != null) {
085 try {
086 exceptionHandler.handle(e);
087 } catch (Throwable handlerException) {
088 LOG.error( "The exception handler "
089 + exceptionHandler.getClass().getCanonicalName()
090 + " threw this exception: "
091 + handlerException
092 + " while trying to handle this exception: "
093 + e, handlerException);
094 }
095
096 } else {
097 LOG.debug("Lock failure: "+ e, e);
098 }
099 } finally {
100 // Let's make sure the database connection is properly
101 // closed when an error occurs so that we're not leaking
102 // connections
103 if (null != connection) {
104 try {
105 connection.close();
106 } catch (SQLException e1) {
107 LOG.error("Caught exception while closing connection: " + e1, e1);
108 }
109
110 connection = null;
111 }
112 }
113 } finally {
114 if (null != lockCreateStatement) {
115 try {
116 lockCreateStatement.close();
117 } catch (SQLException e1) {
118 LOG.debug("Caught while closing statement: " + e1, e1);
119 }
120 lockCreateStatement = null;
121 }
122 }
123
124 LOG.info("Failed to acquire lock. Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
125 try {
126 Thread.sleep(lockAcquireSleepInterval);
127 } catch (InterruptedException ie) {
128 LOG.warn("Master lock retry sleep interrupted", ie);
129 }
130 }
131
132 LOG.info("Becoming the master on dataSource: " + dataSource);
133 }
134
135 public void doStop(ServiceStopper stopper) throws Exception {
136 stopping = true;
137 try {
138 if (lockCreateStatement != null) {
139 lockCreateStatement.cancel();
140 }
141 } catch (SQLFeatureNotSupportedException e) {
142 LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e);
143 }
144 try {
145 if (lockUpdateStatement != null) {
146 lockUpdateStatement.cancel();
147 }
148 } catch (SQLFeatureNotSupportedException e) {
149 LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e);
150 }
151
152 // when the connection is closed from an outside source (lost TCP
153 // connection, db server, etc) and this connection is managed by a pool
154 // it is important to close the connection so that we don't leak
155 // connections
156
157 if (connection != null) {
158 try {
159 connection.rollback();
160 } catch (SQLException sqle) {
161 LOG.warn("Exception while rollbacking the connection on shutdown. This exception is ignored.", sqle);
162 } finally {
163 try {
164 connection.close();
165 } catch (SQLException ignored) {
166 LOG.debug("Exception while closing connection on shutdown. This exception is ignored.", ignored);
167 }
168 lockCreateStatement = null;
169 }
170 }
171 }
172
173 public boolean keepAlive() throws IOException {
174 boolean result = false;
175 try {
176 lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement());
177 lockUpdateStatement.setLong(1, System.currentTimeMillis());
178 if (queryTimeout > 0) {
179 lockUpdateStatement.setQueryTimeout(queryTimeout);
180 }
181 int rows = lockUpdateStatement.executeUpdate();
182 if (rows == 1) {
183 result=true;
184 }
185 } catch (Exception e) {
186 LOG.error("Failed to update database lock: " + e, e);
187 } finally {
188 if (lockUpdateStatement != null) {
189 try {
190 lockUpdateStatement.close();
191 } catch (SQLException e) {
192 LOG.error("Failed to close statement",e);
193 }
194 lockUpdateStatement = null;
195 }
196 }
197 return result;
198 }
199
200 public long getLockAcquireSleepInterval() {
201 return lockAcquireSleepInterval;
202 }
203
204 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
205 this.lockAcquireSleepInterval = lockAcquireSleepInterval;
206 }
207
208 public Handler getExceptionHandler() {
209 return exceptionHandler;
210 }
211
212 public void setExceptionHandler(Handler exceptionHandler) {
213 this.exceptionHandler = exceptionHandler;
214 }
215
216 public int getQueryTimeout() {
217 return queryTimeout;
218 }
219
220 public void setQueryTimeout(int queryTimeout) {
221 this.queryTimeout = queryTimeout;
222 }
223 }