001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.Connection;
021    import java.sql.PreparedStatement;
022    import java.sql.ResultSet;
023    import java.sql.SQLException;
024    import java.sql.Timestamp;
025    import java.util.Date;
026    import java.util.concurrent.TimeUnit;
027    import org.apache.activemq.util.IOExceptionSupport;
028    import org.apache.activemq.util.ServiceStopper;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    /**
033     * Represents an exclusive lease on a database to avoid multiple brokers running
034     * against the same logical database.
035     * 
036     * @org.apache.xbean.XBean element="lease-database-locker"
037     * 
038     */
039    public class LeaseDatabaseLocker extends AbstractJDBCLocker {
040        private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class);
041    
042        protected int maxAllowableDiffFromDBTime = 0;
043        protected long diffFromCurrentTime = Long.MAX_VALUE;
044        protected String leaseHolderId;
045    
046        public void doStart() throws Exception {
047    
048            if (lockAcquireSleepInterval < lockable.getLockKeepAlivePeriod()) {
049                LOG.warn("LockableService keep alive period: " + lockable.getLockKeepAlivePeriod()
050                        + ", which renews the lease, is less than lockAcquireSleepInterval: " + lockAcquireSleepInterval
051                        + ", the lease duration. These values will allow the lease to expire.");
052            }
053    
054            LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the master");
055            String sql = statements.getLeaseObtainStatement();
056            LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
057    
058            long now = 0l;
059            while (!isStopping()) {
060                Connection connection = null;
061                PreparedStatement statement = null;
062                try {
063                    connection = getConnection();
064                    initTimeDiff(connection);
065    
066                    statement = connection.prepareStatement(sql);
067                    setQueryTimeout(statement);
068    
069                    now = System.currentTimeMillis() + diffFromCurrentTime;
070                    statement.setString(1, getLeaseHolderId());
071                    statement.setLong(2, now + lockAcquireSleepInterval);
072                    statement.setLong(3, now);
073    
074                    int result = statement.executeUpdate();
075                    if (result == 1) {
076                        // we got the lease, verify we still have it
077                        if (keepAlive()) {
078                            break;
079                        }
080                    }
081    
082                    reportLeasOwnerShipAndDuration(connection);
083    
084                } catch (Exception e) {
085                    LOG.debug(getLeaseHolderId() + " lease acquire failure: "+ e, e);
086                } finally {
087                    close(statement);
088                    close(connection);
089                }
090    
091                LOG.info(getLeaseHolderId() + " failed to acquire lease.  Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
092                TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
093            }
094            if (isStopping()) {
095                throw new RuntimeException(getLeaseHolderId() + " failing lease acquire due to stop");
096            }
097    
098            LOG.info(getLeaseHolderId() + ", becoming master with lease expiry " + new Date(now) + " on dataSource: " + dataSource);
099        }
100    
101        private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException {
102            PreparedStatement statement = null;
103            try {
104                statement = connection.prepareStatement(statements.getLeaseOwnerStatement());
105                ResultSet resultSet = statement.executeQuery();
106                while (resultSet.next()) {
107                    LOG.info(getLeaseHolderId() + " Lease held by " + resultSet.getString(1) + " till " + new Date(resultSet.getLong(2)));
108                }
109            } finally {
110                close(statement);
111            }
112        }
113    
114        protected long initTimeDiff(Connection connection) throws SQLException {
115            if (Long.MAX_VALUE == diffFromCurrentTime) {
116                if (maxAllowableDiffFromDBTime > 0) {
117                    diffFromCurrentTime = determineTimeDifference(connection);
118                } else {
119                    diffFromCurrentTime = 0l;
120                }
121            }
122            return diffFromCurrentTime;
123        }
124    
125        protected long determineTimeDifference(Connection connection) throws SQLException {
126            PreparedStatement statement = connection.prepareStatement(statements.getCurrentDateTime());
127            ResultSet resultSet = statement.executeQuery();
128            long result = 0l;
129            if (resultSet.next()) {
130                Timestamp timestamp = resultSet.getTimestamp(1);
131                long diff = System.currentTimeMillis() - timestamp.getTime();
132                if (Math.abs(diff) > maxAllowableDiffFromDBTime) {
133                    // off by more than maxAllowableDiffFromDBTime so lets adjust
134                    result = (-diff);
135                }
136                LOG.info(getLeaseHolderId() + " diff adjust from db: " + result + ", db time: " + timestamp);
137            }
138            return result;
139        }
140    
141        public void doStop(ServiceStopper stopper) throws Exception {
142            if (lockable.getBrokerService() != null && lockable.getBrokerService().isRestartRequested()) {
143                // keep our lease for restart
144                return;
145            }
146            releaseLease();
147        }
148    
149        private void releaseLease() {
150            Connection connection = null;
151            PreparedStatement statement = null;
152            try {
153                connection = getConnection();
154                statement = connection.prepareStatement(statements.getLeaseUpdateStatement());
155                statement.setString(1, null);
156                statement.setLong(2, 0l);
157                statement.setString(3, getLeaseHolderId());
158                if (statement.executeUpdate() == 1) {
159                    LOG.info(getLeaseHolderId() + ", released lease");
160                }
161            } catch (Exception e) {
162                LOG.error(getLeaseHolderId() + " failed to release lease: " + e, e);
163            } finally {
164                close(statement);
165                close(connection);
166            }
167        }
168    
169        @Override
170        public boolean keepAlive() throws IOException {
171            boolean result = false;
172            final String sql = statements.getLeaseUpdateStatement();
173            LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);
174    
175            Connection connection = null;
176            PreparedStatement statement = null;
177            try {
178                connection = getConnection();
179    
180                initTimeDiff(connection);
181                statement = connection.prepareStatement(sql);
182                setQueryTimeout(statement);
183    
184                final long now = System.currentTimeMillis() + diffFromCurrentTime;
185                statement.setString(1, getLeaseHolderId());
186                statement.setLong(2, now + lockAcquireSleepInterval);
187                statement.setString(3, getLeaseHolderId());
188    
189                result = (statement.executeUpdate() == 1);
190    
191                if (!result) {
192                    reportLeasOwnerShipAndDuration(connection);
193                }
194            } catch (Exception e) {
195                LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
196                IOException ioe = IOExceptionSupport.create(e);
197                lockable.getBrokerService().handleIOException(ioe);
198                throw ioe;
199            } finally {
200                close(statement);
201                close(connection);
202            }
203            return result;
204        }
205    
206        public String getLeaseHolderId() {
207            if (leaseHolderId == null) {
208                if (lockable.getBrokerService() != null) {
209                    leaseHolderId = lockable.getBrokerService().getBrokerName();
210                }
211            }
212            return leaseHolderId;
213        }
214    
215        public void setLeaseHolderId(String leaseHolderId) {
216            this.leaseHolderId = leaseHolderId;
217        }
218    
219        public int getMaxAllowableDiffFromDBTime() {
220            return maxAllowableDiffFromDBTime;
221        }
222    
223        public void setMaxAllowableDiffFromDBTime(int maxAllowableDiffFromDBTime) {
224            this.maxAllowableDiffFromDBTime = maxAllowableDiffFromDBTime;
225        }
226    
227        @Override
228        public String toString() {
229            return "LeaseDatabaseLocker owner:" + leaseHolderId + ",duration:" + lockAcquireSleepInterval + ",renew:" + lockAcquireSleepInterval;
230        }
231    }