001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.Connection;
021    import java.sql.PreparedStatement;
022    import java.sql.ResultSet;
023    import java.sql.SQLException;
024    import java.sql.Timestamp;
025    import java.util.Date;
026    import java.util.concurrent.TimeUnit;
027    import javax.sql.DataSource;
028    
029    import org.apache.activemq.broker.AbstractLocker;
030    import org.apache.activemq.store.PersistenceAdapter;
031    import org.apache.activemq.util.IOExceptionSupport;
032    import org.apache.activemq.util.ServiceStopper;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     * Represents an exclusive lease on a database to avoid multiple brokers running
038     * against the same logical database.
039     * 
040     * @org.apache.xbean.XBean element="lease-database-locker"
041     * 
042     */
043    public class LeaseDatabaseLocker extends AbstractLocker {
044        private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class);
045        protected DataSource dataSource;
046        protected Statements statements;
047    
048        protected boolean stopping;
049        protected int maxAllowableDiffFromDBTime = 0;
050        protected long diffFromCurrentTime = Long.MAX_VALUE;
051        protected String leaseHolderId;
052        protected int queryTimeout = -1;
053        JDBCPersistenceAdapter persistenceAdapter;
054    
055    
056        public void configure(PersistenceAdapter adapter) throws IOException {
057            if (adapter instanceof JDBCPersistenceAdapter) {
058                this.persistenceAdapter = (JDBCPersistenceAdapter)adapter;
059                this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
060                this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
061            }
062        }
063        
064        public void doStart() throws Exception {
065            stopping = false;
066    
067            LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the Master broker");
068            String sql = statements.getLeaseObtainStatement();
069            LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
070    
071            while (!stopping) {
072                Connection connection = null;
073                PreparedStatement statement = null;
074                try {
075                    connection = getConnection();
076                    initTimeDiff(connection);
077    
078                    statement = connection.prepareStatement(sql);
079                    setQueryTimeout(statement);
080    
081                    final long now = System.currentTimeMillis() + diffFromCurrentTime;
082                    statement.setString(1, getLeaseHolderId());
083                    statement.setLong(2, now + lockAcquireSleepInterval);
084                    statement.setLong(3, now);
085    
086                    int result = statement.executeUpdate();
087                    if (result == 1) {
088                        // we got the lease, verify we still have it
089                        if (keepAlive()) {
090                            break;
091                        }
092                    }
093    
094                    reportLeasOwnerShipAndDuration(connection);
095    
096                } catch (Exception e) {
097                    LOG.debug(getLeaseHolderId() + " lease acquire failure: "+ e, e);
098                } finally {
099                    close(statement);
100                    close(connection);
101                }
102    
103                LOG.info(getLeaseHolderId() + " failed to acquire lease.  Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
104                TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
105            }
106            if (stopping) {
107                throw new RuntimeException(getLeaseHolderId() + " failing lease acquire due to stop");
108            }
109    
110            LOG.info(getLeaseHolderId() + ", becoming the master on dataSource: " + dataSource);
111        }
112    
113        private void setQueryTimeout(PreparedStatement statement) throws SQLException {
114            if (queryTimeout > 0) {
115                statement.setQueryTimeout(queryTimeout);
116            }
117        }
118    
119        private Connection getConnection() throws SQLException {
120            return dataSource.getConnection();
121        }
122    
123        private void close(Connection connection) {
124            if (null != connection) {
125                try {
126                    connection.close();
127                } catch (SQLException e1) {
128                    LOG.debug(getLeaseHolderId() + " caught exception while closing connection: " + e1, e1);
129                }
130            }
131        }
132    
133        private void close(PreparedStatement statement) {
134            if (null != statement) {
135                try {
136                    statement.close();
137                } catch (SQLException e1) {
138                    LOG.debug(getLeaseHolderId() + ", caught while closing statement: " + e1, e1);
139                }
140            }
141        }
142    
143        private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException {
144            PreparedStatement statement = null;
145            try {
146                statement = connection.prepareStatement(statements.getLeaseOwnerStatement());
147                ResultSet resultSet = statement.executeQuery();
148                while (resultSet.next()) {
149                    LOG.info(getLeaseHolderId() + " Lease held by " + resultSet.getString(1) + " till " + new Date(resultSet.getLong(2)));
150                }
151            } finally {
152                close(statement);
153            }
154        }
155    
156        protected long initTimeDiff(Connection connection) throws SQLException {
157            if (Long.MAX_VALUE == diffFromCurrentTime) {
158                if (maxAllowableDiffFromDBTime > 0) {
159                    diffFromCurrentTime = determineTimeDifference(connection);
160                } else {
161                    diffFromCurrentTime = 0l;
162                }
163            }
164            return diffFromCurrentTime;
165        }
166    
167        private long determineTimeDifference(Connection connection) throws SQLException {
168            PreparedStatement statement = connection.prepareStatement(statements.getCurrentDateTime());
169            ResultSet resultSet = statement.executeQuery();
170            long result = 0l;
171            if (resultSet.next()) {
172                Timestamp timestamp = resultSet.getTimestamp(1);
173                long diff = System.currentTimeMillis() - timestamp.getTime();
174                LOG.info(getLeaseHolderId() + " diff from db: " + diff + ", db time: " + timestamp);
175                if (diff > maxAllowableDiffFromDBTime || diff < -maxAllowableDiffFromDBTime) {
176                    // off by more than maxAllowableDiffFromDBTime so lets adjust
177                    result = diff;
178                }
179            }
180            return result;
181        }
182    
183        public void doStop(ServiceStopper stopper) throws Exception {
184            releaseLease();
185            stopping = true;
186        }
187    
188        private void releaseLease() {
189            Connection connection = null;
190            PreparedStatement statement = null;
191            try {
192                connection = getConnection();
193                statement = connection.prepareStatement(statements.getLeaseUpdateStatement());
194                statement.setString(1, null);
195                statement.setLong(2, 0l);
196                statement.setString(3, getLeaseHolderId());
197                if (statement.executeUpdate() == 1) {
198                    LOG.info(getLeaseHolderId() + ", released lease");
199                }
200            } catch (Exception e) {
201                LOG.error(getLeaseHolderId() + " failed to release lease: " + e, e);
202            } finally {
203                close(statement);
204                close(connection);
205            }
206        }
207    
208        @Override
209        public boolean keepAlive() throws IOException {
210            boolean result = false;
211            final String sql = statements.getLeaseUpdateStatement();
212            LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);
213    
214            Connection connection = null;
215            PreparedStatement statement = null;
216            try {
217                connection = getConnection();
218    
219                initTimeDiff(connection);
220                statement = connection.prepareStatement(sql);
221                setQueryTimeout(statement);
222    
223                final long now = System.currentTimeMillis() + diffFromCurrentTime;
224                statement.setString(1, getLeaseHolderId());
225                statement.setLong(2, now + lockAcquireSleepInterval);
226                statement.setString(3, getLeaseHolderId());
227    
228                result = (statement.executeUpdate() == 1);
229            } catch (Exception e) {
230                LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
231                IOException ioe = IOExceptionSupport.create(e);
232                persistenceAdapter.getBrokerService().handleIOException(ioe);
233                throw ioe;
234            } finally {
235                close(statement);
236                close(connection);
237            }
238            return result;
239        }
240    
241        public long getLockAcquireSleepInterval() {
242            return lockAcquireSleepInterval;
243        }
244    
245        public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
246            this.lockAcquireSleepInterval = lockAcquireSleepInterval;
247        }
248        
249        public int getQueryTimeout() {
250            return queryTimeout;
251        }
252    
253        public void setQueryTimeout(int queryTimeout) {
254            this.queryTimeout = queryTimeout;
255        }
256    
257        public String getLeaseHolderId() {
258            if (leaseHolderId == null) {
259                if (persistenceAdapter.getBrokerService() != null) {
260                    leaseHolderId = persistenceAdapter.getBrokerService().getBrokerName();
261                }
262            }
263            return leaseHolderId;
264        }
265    
266        public void setLeaseHolderId(String leaseHolderId) {
267            this.leaseHolderId = leaseHolderId;
268        }
269    
270        public int getMaxAllowableDiffFromDBTime() {
271            return maxAllowableDiffFromDBTime;
272        }
273    
274        public void setMaxAllowableDiffFromDBTime(int maxAllowableDiffFromDBTime) {
275            this.maxAllowableDiffFromDBTime = maxAllowableDiffFromDBTime;
276        }
277    }