001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.Connection;
021    import java.sql.PreparedStatement;
022    import java.sql.SQLException;
023    import java.sql.SQLFeatureNotSupportedException;
024    
025    import javax.sql.DataSource;
026    
027    import org.apache.activemq.broker.AbstractLocker;
028    import org.apache.activemq.store.PersistenceAdapter;
029    import org.apache.activemq.util.Handler;
030    import org.apache.activemq.util.ServiceStopper;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    
034    /**
035     * Represents an exclusive lock on a database to avoid multiple brokers running
036     * against the same logical database.
037     * 
038     * @org.apache.xbean.XBean element="database-locker"
039     * 
040     */
041    public class DefaultDatabaseLocker extends AbstractLocker {
042        public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 1000;
043        private static final Logger LOG = LoggerFactory.getLogger(DefaultDatabaseLocker.class);
044        protected DataSource dataSource;
045        protected Statements statements;
046    
047        protected volatile PreparedStatement lockCreateStatement;
048        protected volatile PreparedStatement lockUpdateStatement;
049        protected volatile Connection connection;
050        protected volatile boolean stopping;
051        protected Handler<Exception> exceptionHandler;
052        protected int queryTimeout = 10;
053    
054        public void configure(PersistenceAdapter adapter) throws IOException {
055            if (adapter instanceof JDBCPersistenceAdapter) {
056                this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
057                this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
058            }
059            lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
060        }
061        
062        public void doStart() throws Exception {
063            stopping = false;
064    
065            LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
066            String sql = statements.getLockCreateStatement();
067            LOG.debug("Locking Query is "+sql);
068            
069            while (true) {
070                try {
071                    connection = dataSource.getConnection();
072                    connection.setAutoCommit(false);
073                    lockCreateStatement = connection.prepareStatement(sql);
074                    lockCreateStatement.execute();
075                    break;
076                } catch (Exception e) {
077                    try {
078                        if (stopping) {
079                            throw new Exception(
080                                    "Cannot start broker as being asked to shut down. " 
081                                            + "Interrupted attempt to acquire lock: "
082                                            + e, e);
083                        }
084                        if (exceptionHandler != null) {
085                            try {
086                                exceptionHandler.handle(e);
087                            } catch (Throwable handlerException) {
088                                LOG.error( "The exception handler "
089                                        + exceptionHandler.getClass().getCanonicalName()
090                                        + " threw this exception: "
091                                        + handlerException
092                                        + " while trying to handle this exception: "
093                                        + e, handlerException);
094                            }
095    
096                        } else {
097                            LOG.debug("Lock failure: "+ e, e);
098                        }
099                    } finally {
100                        // Let's make sure the database connection is properly
101                        // closed when an error occurs so that we're not leaking
102                        // connections 
103                        if (null != connection) {
104                            try {
105                                connection.close();
106                            } catch (SQLException e1) {
107                                LOG.error("Caught exception while closing connection: " + e1, e1);
108                            }
109                            
110                            connection = null;
111                        }
112                    }
113                } finally {
114                    if (null != lockCreateStatement) {
115                        try {
116                            lockCreateStatement.close();
117                        } catch (SQLException e1) {
118                            LOG.debug("Caught while closing statement: " + e1, e1);
119                        }
120                        lockCreateStatement = null;
121                    }
122                }
123    
124                LOG.info("Failed to acquire lock.  Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
125                try {
126                    Thread.sleep(lockAcquireSleepInterval);
127                } catch (InterruptedException ie) {
128                    LOG.warn("Master lock retry sleep interrupted", ie);
129                }
130            }
131    
132            LOG.info("Becoming the master on dataSource: " + dataSource);
133        }
134    
135        public void doStop(ServiceStopper stopper) throws Exception {
136            stopping = true;
137            try {
138                if (lockCreateStatement != null) {
139                    lockCreateStatement.cancel();                           
140                }
141            } catch (SQLFeatureNotSupportedException e) {
142                LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e);                   
143            }
144            try {
145                if (lockUpdateStatement != null) {
146                        lockUpdateStatement.cancel();                       
147                }
148            } catch (SQLFeatureNotSupportedException e) {
149                LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e);                   
150            }
151    
152            // when the connection is closed from an outside source (lost TCP
153            // connection, db server, etc) and this connection is managed by a pool
154            // it is important to close the connection so that we don't leak
155            // connections
156    
157            if (connection != null) {
158                try {
159                    connection.rollback();
160                } catch (SQLException sqle) {
161                    LOG.warn("Exception while rollbacking the connection on shutdown. This exception is ignored.", sqle);
162                } finally {
163                    try {
164                        connection.close();
165                    } catch (SQLException ignored) {
166                        LOG.debug("Exception while closing connection on shutdown. This exception is ignored.", ignored);
167                    }
168                    lockCreateStatement = null;
169                }
170            }
171        }
172    
173        public boolean keepAlive() throws IOException {
174            boolean result = false;
175            try {
176                lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement());
177                lockUpdateStatement.setLong(1, System.currentTimeMillis());
178                if (queryTimeout > 0) {
179                    lockUpdateStatement.setQueryTimeout(queryTimeout);
180                }
181                int rows = lockUpdateStatement.executeUpdate();
182                if (rows == 1) {
183                    result=true;
184                }
185            } catch (Exception e) {
186                LOG.error("Failed to update database lock: " + e, e);
187            } finally {
188                if (lockUpdateStatement != null) {
189                    try {
190                        lockUpdateStatement.close();
191                    } catch (SQLException e) {
192                        LOG.error("Failed to close statement",e);
193                    }
194                    lockUpdateStatement = null;
195                }
196            }
197            return result;
198        }
199     
200        public long getLockAcquireSleepInterval() {
201            return lockAcquireSleepInterval;
202        }
203    
204        public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
205            this.lockAcquireSleepInterval = lockAcquireSleepInterval;
206        }
207        
208        public Handler getExceptionHandler() {
209            return exceptionHandler;
210        }
211    
212        public void setExceptionHandler(Handler exceptionHandler) {
213            this.exceptionHandler = exceptionHandler;
214        }
215    
216        public int getQueryTimeout() {
217            return queryTimeout;
218        }
219    
220        public void setQueryTimeout(int queryTimeout) {
221            this.queryTimeout = queryTimeout;
222        }
223    }