001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.Connection;
021    import java.sql.PreparedStatement;
022    import java.sql.SQLException;
023    import java.sql.SQLFeatureNotSupportedException;
024    
025    import javax.sql.DataSource;
026    
027    import org.apache.activemq.broker.AbstractLocker;
028    import org.apache.activemq.store.PersistenceAdapter;
029    import org.apache.activemq.util.Handler;
030    import org.apache.activemq.util.ServiceStopper;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    
034    /**
035     * Represents an exclusive lock on a database to avoid multiple brokers running
036     * against the same logical database.
037     * 
038     * @org.apache.xbean.XBean element="database-locker"
039     * 
040     */
041    public class DefaultDatabaseLocker extends AbstractLocker {
042        private static final Logger LOG = LoggerFactory.getLogger(DefaultDatabaseLocker.class);
043        protected DataSource dataSource;
044        protected Statements statements;
045    
046        protected volatile PreparedStatement lockCreateStatement;
047        protected volatile PreparedStatement lockUpdateStatement;
048        protected volatile Connection connection;
049        protected volatile boolean stopping;
050        protected Handler<Exception> exceptionHandler;
051        protected int queryTimeout = 10;
052    
053        public void configure(PersistenceAdapter adapter) throws IOException {
054            if (adapter instanceof JDBCPersistenceAdapter) {
055                this.dataSource = ((JDBCPersistenceAdapter) adapter).getLockDataSource();
056                this.statements = ((JDBCPersistenceAdapter) adapter).getStatements();
057            }
058        }
059        
060        public void doStart() throws Exception {
061            stopping = false;
062    
063            LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
064            String sql = statements.getLockCreateStatement();
065            LOG.debug("Locking Query is "+sql);
066            
067            while (true) {
068                try {
069                    connection = dataSource.getConnection();
070                    connection.setAutoCommit(false);
071                    lockCreateStatement = connection.prepareStatement(sql);
072                    lockCreateStatement.execute();
073                    break;
074                } catch (Exception e) {
075                    try {
076                        if (stopping) {
077                            throw new Exception(
078                                    "Cannot start broker as being asked to shut down. " 
079                                            + "Interrupted attempt to acquire lock: "
080                                            + e, e);
081                        }
082                        if (exceptionHandler != null) {
083                            try {
084                                exceptionHandler.handle(e);
085                            } catch (Throwable handlerException) {
086                                LOG.error( "The exception handler "
087                                        + exceptionHandler.getClass().getCanonicalName()
088                                        + " threw this exception: "
089                                        + handlerException
090                                        + " while trying to handle this exception: "
091                                        + e, handlerException);
092                            }
093    
094                        } else {
095                            LOG.debug("Lock failure: "+ e, e);
096                        }
097                    } finally {
098                        // Let's make sure the database connection is properly
099                        // closed when an error occurs so that we're not leaking
100                        // connections 
101                        if (null != connection) {
102                            try {
103                                connection.rollback();
104                            } catch (SQLException e1) {
105                                LOG.error("Caught exception during rollback on connection: " + e1, e1);
106                            }
107                            try {
108                                connection.close();
109                            } catch (SQLException e1) {
110                                LOG.error("Caught exception while closing connection: " + e1, e1);
111                            }
112                            
113                            connection = null;
114                        }
115                    }
116                } finally {
117                    if (null != lockCreateStatement) {
118                        try {
119                            lockCreateStatement.close();
120                        } catch (SQLException e1) {
121                            LOG.debug("Caught while closing statement: " + e1, e1);
122                        }
123                        lockCreateStatement = null;
124                    }
125                }
126    
127                LOG.info("Failed to acquire lock.  Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
128                try {
129                    Thread.sleep(lockAcquireSleepInterval);
130                } catch (InterruptedException ie) {
131                    LOG.warn("Master lock retry sleep interrupted", ie);
132                }
133            }
134    
135            LOG.info("Becoming the master on dataSource: " + dataSource);
136        }
137    
138        public void doStop(ServiceStopper stopper) throws Exception {
139            stopping = true;
140            try {
141                if (lockCreateStatement != null) {
142                    lockCreateStatement.cancel();                           
143                }
144            } catch (SQLFeatureNotSupportedException e) {
145                LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e);                   
146            }
147            try {
148                if (lockUpdateStatement != null) {
149                        lockUpdateStatement.cancel();                       
150                }
151            } catch (SQLFeatureNotSupportedException e) {
152                LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e);                   
153            }
154    
155            // when the connection is closed from an outside source (lost TCP
156            // connection, db server, etc) and this connection is managed by a pool
157            // it is important to close the connection so that we don't leak
158            // connections
159    
160            if (connection != null) {
161                try {
162                    connection.rollback();
163                } catch (SQLException sqle) {
164                    LOG.warn("Exception while rollbacking the connection on shutdown. This exception is ignored.", sqle);
165                } finally {
166                    try {
167                        connection.close();
168                    } catch (SQLException ignored) {
169                        LOG.debug("Exception while closing connection on shutdown. This exception is ignored.", ignored);
170                    }
171                    lockCreateStatement = null;
172                }
173            }
174        }
175    
176        public boolean keepAlive() throws IOException {
177            boolean result = false;
178            try {
179                lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement());
180                lockUpdateStatement.setLong(1, System.currentTimeMillis());
181                if (queryTimeout > 0) {
182                    lockUpdateStatement.setQueryTimeout(queryTimeout);
183                }
184                int rows = lockUpdateStatement.executeUpdate();
185                if (rows == 1) {
186                    result=true;
187                }
188            } catch (Exception e) {
189                LOG.error("Failed to update database lock: " + e, e);
190            } finally {
191                if (lockUpdateStatement != null) {
192                    try {
193                        lockUpdateStatement.close();
194                    } catch (SQLException e) {
195                        LOG.error("Failed to close statement",e);
196                    }
197                    lockUpdateStatement = null;
198                }
199            }
200            return result;
201        }
202     
203        public long getLockAcquireSleepInterval() {
204            return lockAcquireSleepInterval;
205        }
206    
207        public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
208            this.lockAcquireSleepInterval = lockAcquireSleepInterval;
209        }
210        
211        public Handler getExceptionHandler() {
212            return exceptionHandler;
213        }
214    
215        public void setExceptionHandler(Handler exceptionHandler) {
216            this.exceptionHandler = exceptionHandler;
217        }
218    
219        public int getQueryTimeout() {
220            return queryTimeout;
221        }
222    
223        public void setQueryTimeout(int queryTimeout) {
224            this.queryTimeout = queryTimeout;
225        }
226    }