001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.Connection;
021    import java.sql.PreparedStatement;
022    import java.sql.ResultSet;
023    import java.sql.SQLException;
024    import java.sql.Timestamp;
025    import java.util.Date;
026    import java.util.concurrent.TimeUnit;
027    import javax.sql.DataSource;
028    
029    import org.apache.activemq.broker.AbstractLocker;
030    import org.apache.activemq.store.PersistenceAdapter;
031    import org.apache.activemq.util.IOExceptionSupport;
032    import org.apache.activemq.util.ServiceStopper;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     * Represents an exclusive lease on a database to avoid multiple brokers running
038     * against the same logical database.
039     * 
040     * @org.apache.xbean.XBean element="lease-database-locker"
041     * 
042     */
043    public class LeaseDatabaseLocker extends AbstractLocker {
044        private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class);
045        public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 5000;
046        protected DataSource dataSource;
047        protected Statements statements;
048    
049        protected boolean stopping;
050        protected int maxAllowableDiffFromDBTime = 0;
051        protected long diffFromCurrentTime = Long.MAX_VALUE;
052        protected String leaseHolderId;
053        protected int queryTimeout = -1;
054        JDBCPersistenceAdapter persistenceAdapter;
055    
056    
057        public void configure(PersistenceAdapter adapter) throws IOException {
058            if (adapter instanceof JDBCPersistenceAdapter) {
059                this.persistenceAdapter = (JDBCPersistenceAdapter)adapter;
060                this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
061                this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
062            }
063            lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
064        }
065        
066        public void doStart() throws Exception {
067            stopping = false;
068    
069            LOG.info(getLeaseHolderId() + " attempting to acquire exclusive lease to become the Master broker");
070            String sql = statements.getLeaseObtainStatement();
071            LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
072    
073            while (!stopping) {
074                Connection connection = null;
075                PreparedStatement statement = null;
076                try {
077                    connection = getConnection();
078                    initTimeDiff(connection);
079    
080                    statement = connection.prepareStatement(sql);
081                    setQueryTimeout(statement);
082    
083                    final long now = System.currentTimeMillis() + diffFromCurrentTime;
084                    statement.setString(1, getLeaseHolderId());
085                    statement.setLong(2, now + lockAcquireSleepInterval);
086                    statement.setLong(3, now);
087    
088                    int result = statement.executeUpdate();
089                    if (result == 1) {
090                        // we got the lease, verify we still have it
091                        if (keepAlive()) {
092                            break;
093                        }
094                    }
095    
096                    reportLeasOwnerShipAndDuration(connection);
097    
098                } catch (Exception e) {
099                    LOG.debug(getLeaseHolderId() + " lease aquire failure: "+ e, e);
100                } finally {
101                    close(statement);
102                    close(connection);
103                }
104    
105                LOG.info(getLeaseHolderId() + " failed to acquire lease.  Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
106                TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
107            }
108            if (stopping) {
109                throw new RuntimeException(getLeaseHolderId() + " failing lease aquire due to stop");
110            }
111    
112            LOG.info(getLeaseHolderId() + ", becoming the master on dataSource: " + dataSource);
113        }
114    
115        private void setQueryTimeout(PreparedStatement statement) throws SQLException {
116            if (queryTimeout > 0) {
117                statement.setQueryTimeout(queryTimeout);
118            }
119        }
120    
121        private Connection getConnection() throws SQLException {
122            return dataSource.getConnection();
123        }
124    
125        private void close(Connection connection) {
126            if (null != connection) {
127                try {
128                    connection.close();
129                } catch (SQLException e1) {
130                    LOG.debug(getLeaseHolderId() + " caught exception while closing connection: " + e1, e1);
131                }
132            }
133        }
134    
135        private void close(PreparedStatement statement) {
136            if (null != statement) {
137                try {
138                    statement.close();
139                } catch (SQLException e1) {
140                    LOG.debug(getLeaseHolderId() + ", caught while closing statement: " + e1, e1);
141                }
142            }
143        }
144    
145        private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException {
146            PreparedStatement statement = null;
147            try {
148                statement = connection.prepareStatement(statements.getLeaseOwnerStatement());
149                ResultSet resultSet = statement.executeQuery();
150                while (resultSet.next()) {
151                    LOG.info(getLeaseHolderId() + " Lease held by " + resultSet.getString(1) + " till " + new Date(resultSet.getLong(2)));
152                }
153            } finally {
154                close(statement);
155            }
156        }
157    
158        protected long initTimeDiff(Connection connection) throws SQLException {
159            if (Long.MAX_VALUE == diffFromCurrentTime) {
160                if (maxAllowableDiffFromDBTime > 0) {
161                    diffFromCurrentTime = determineTimeDifference(connection);
162                } else {
163                    diffFromCurrentTime = 0l;
164                }
165            }
166            return diffFromCurrentTime;
167        }
168    
169        private long determineTimeDifference(Connection connection) throws SQLException {
170            PreparedStatement statement = connection.prepareStatement(statements.getCurrentDateTime());
171            ResultSet resultSet = statement.executeQuery();
172            long result = 0l;
173            if (resultSet.next()) {
174                Timestamp timestamp = resultSet.getTimestamp(1);
175                long diff = System.currentTimeMillis() - timestamp.getTime();
176                LOG.info(getLeaseHolderId() + " diff from db: " + diff + ", db time: " + timestamp);
177                if (diff > maxAllowableDiffFromDBTime || diff < -maxAllowableDiffFromDBTime) {
178                    // off by more than maxAllowableDiffFromDBTime so lets adjust
179                    result = diff;
180                }
181            }
182            return result;
183        }
184    
185        public void doStop(ServiceStopper stopper) throws Exception {
186            releaseLease();
187            stopping = true;
188        }
189    
190        private void releaseLease() {
191            Connection connection = null;
192            PreparedStatement statement = null;
193            try {
194                connection = getConnection();
195                statement = connection.prepareStatement(statements.getLeaseUpdateStatement());
196                statement.setString(1, null);
197                statement.setLong(2, 0l);
198                statement.setString(3, getLeaseHolderId());
199                if (statement.executeUpdate() == 1) {
200                    LOG.info(getLeaseHolderId() + ", released lease");
201                }
202            } catch (Exception e) {
203                LOG.error(getLeaseHolderId() + " failed to release lease: " + e, e);
204            } finally {
205                close(statement);
206                close(connection);
207            }
208        }
209    
210        @Override
211        public boolean keepAlive() throws IOException {
212            boolean result = false;
213            final String sql = statements.getLeaseUpdateStatement();
214            LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);
215    
216            Connection connection = null;
217            PreparedStatement statement = null;
218            try {
219                connection = getConnection();
220    
221                initTimeDiff(connection);
222                statement = connection.prepareStatement(sql);
223                setQueryTimeout(statement);
224    
225                final long now = System.currentTimeMillis() + diffFromCurrentTime;
226                statement.setString(1, getLeaseHolderId());
227                statement.setLong(2, now + lockAcquireSleepInterval);
228                statement.setString(3, getLeaseHolderId());
229    
230                result = (statement.executeUpdate() == 1);
231            } catch (Exception e) {
232                LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
233                IOException ioe = IOExceptionSupport.create(e);
234                persistenceAdapter.getBrokerService().handleIOException(ioe);
235                throw ioe;
236            } finally {
237                close(statement);
238                close(connection);
239            }
240            return result;
241        }
242    
243        public long getLockAcquireSleepInterval() {
244            return lockAcquireSleepInterval;
245        }
246    
247        public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
248            this.lockAcquireSleepInterval = lockAcquireSleepInterval;
249        }
250        
251        public int getQueryTimeout() {
252            return queryTimeout;
253        }
254    
255        public void setQueryTimeout(int queryTimeout) {
256            this.queryTimeout = queryTimeout;
257        }
258    
259        public String getLeaseHolderId() {
260            if (leaseHolderId == null) {
261                if (persistenceAdapter.getBrokerService() != null) {
262                    leaseHolderId = persistenceAdapter.getBrokerService().getBrokerName();
263                }
264            }
265            return leaseHolderId;
266        }
267    
268        public void setLeaseHolderId(String leaseHolderId) {
269            this.leaseHolderId = leaseHolderId;
270        }
271    
272        public int getMaxAllowableDiffFromDBTime() {
273            return maxAllowableDiffFromDBTime;
274        }
275    
276        public void setMaxAllowableDiffFromDBTime(int maxAllowableDiffFromDBTime) {
277            this.maxAllowableDiffFromDBTime = maxAllowableDiffFromDBTime;
278        }
279    }