001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.sql.Connection;
021import java.sql.PreparedStatement;
022import java.sql.SQLException;
023import java.sql.Statement;
024import java.util.LinkedList;
025import java.util.List;
026
027import javax.sql.DataSource;
028
029import org.apache.activemq.util.IOExceptionSupport;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * Helps keep track of the current transaction/JDBC connection.
035 */
036public class TransactionContext {
037
038    private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
039
040    private final DataSource dataSource;
041    private final JDBCPersistenceAdapter persistenceAdapter;
042    private Connection connection;
043    private boolean inTx;
044    private PreparedStatement addMessageStatement;
045    private PreparedStatement removedMessageStatement;
046    private PreparedStatement updateLastAckStatement;
047    // a cheap dirty level that we can live with    
048    private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
049    private LinkedList<Runnable> completions = new LinkedList<Runnable>();
050
051    public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
052        this.persistenceAdapter = persistenceAdapter;
053        this.dataSource = persistenceAdapter.getDataSource();
054    }
055
056    public Connection getConnection() throws IOException {
057        if (connection == null) {
058            try {
059                connection = dataSource.getConnection();
060                if (persistenceAdapter.isChangeAutoCommitAllowed()) {
061                    boolean autoCommit = !inTx;
062                    if (connection.getAutoCommit() != autoCommit) {
063                        LOG.trace("Setting auto commit to {} on connection {}", autoCommit, connection);
064                        connection.setAutoCommit(autoCommit);
065                    }
066                }
067            } catch (SQLException e) {
068                JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e);
069                inTx = false;
070                close();
071                IOException ioe = IOExceptionSupport.create(e);
072                if (persistenceAdapter.getBrokerService() != null) {
073                    persistenceAdapter.getBrokerService().handleIOException(ioe);
074                }
075                throw ioe;
076            }
077
078            try {
079                connection.setTransactionIsolation(transactionIsolation);
080            } catch (Throwable e) {
081                // ignore
082                LOG.trace("Cannot set transaction isolation to " + transactionIsolation + " due " + e.getMessage()
083                        + ". This exception is ignored.", e);
084            }
085        }
086        return connection;
087    }
088
089    public void executeBatch() throws SQLException {
090        try {
091            executeBatch(addMessageStatement, "Failed add a message");
092        } finally {
093            addMessageStatement = null;
094            try {
095                executeBatch(removedMessageStatement, "Failed to remove a message");
096            } finally {
097                removedMessageStatement = null;
098                try {
099                    executeBatch(updateLastAckStatement, "Failed to ack a message");
100                } finally {
101                    updateLastAckStatement = null;
102                }
103            }
104        }
105    }
106
107    private void executeBatch(PreparedStatement p, String message) throws SQLException {
108        if (p == null) {
109            return;
110        }
111
112        try {
113            int[] rc = p.executeBatch();
114            for (int i = 0; i < rc.length; i++) {
115                int code = rc[i];
116                if (code < 0 && code != Statement.SUCCESS_NO_INFO) {
117                    throw new SQLException(message + ". Response code: " + code);
118                }
119            }
120        } finally {
121            try {
122                p.close();
123            } catch (Throwable e) {
124            }
125        }
126    }
127
128    public void close() throws IOException {
129        if (!inTx) {
130            try {
131
132                /**
133                 * we are not in a transaction so should not be committing ??
134                 * This was previously commented out - but had adverse affects
135                 * on testing - so it's back!
136                 * 
137                 */
138                try {
139                    executeBatch();
140                } finally {
141                    if (connection != null && !connection.getAutoCommit()) {
142                        connection.commit();
143                    }
144                }
145
146            } catch (SQLException e) {
147                JDBCPersistenceAdapter.log("Error while closing connection: ", e);
148                IOException ioe = IOExceptionSupport.create(e);
149                persistenceAdapter.getBrokerService().handleIOException(ioe);
150                throw ioe;
151            } finally {
152                try {
153                    if (connection != null) {
154                        connection.close();
155                    }
156                } catch (Throwable e) {
157                    // ignore
158                    LOG.trace("Closing connection failed due: " + e.getMessage() + ". This exception is ignored.", e);
159                } finally {
160                    connection = null;
161                }
162                for (Runnable completion: completions) {
163                    completion.run();
164                }
165                completions.clear();
166            }
167        }
168    }
169
170    public void begin() throws IOException {
171        if (inTx) {
172            throw new IOException("Already started.");
173        }
174        inTx = true;
175        connection = getConnection();
176    }
177
178    public void commit() throws IOException {
179        if (!inTx) {
180            throw new IOException("Not started.");
181        }
182        try {
183            executeBatch();
184            if (!connection.getAutoCommit()) {
185                connection.commit();
186            }
187        } catch (SQLException e) {
188            JDBCPersistenceAdapter.log("Commit failed: ", e);
189            try {
190                doRollback();
191            } catch (Exception ignored) {}
192            IOException ioe = IOExceptionSupport.create(e);
193            persistenceAdapter.getBrokerService().handleIOException(ioe);
194            throw ioe;
195        } finally {
196            inTx = false;
197            close();
198        }
199    }
200
201    public void rollback() throws IOException {
202        if (!inTx) {
203            throw new IOException("Not started.");
204        }
205        try {
206            doRollback();
207        } catch (SQLException e) {
208            JDBCPersistenceAdapter.log("Rollback failed: ", e);
209            throw IOExceptionSupport.create(e);
210        } finally {
211            inTx = false;
212            close();
213        }
214    }
215
216    private void doRollback() throws SQLException {
217        if (addMessageStatement != null) {
218            addMessageStatement.close();
219            addMessageStatement = null;
220        }
221        if (removedMessageStatement != null) {
222            removedMessageStatement.close();
223            removedMessageStatement = null;
224        }
225        if (updateLastAckStatement != null) {
226            updateLastAckStatement.close();
227            updateLastAckStatement = null;
228        }
229        connection.rollback();
230    }
231
232    public PreparedStatement getAddMessageStatement() {
233        return addMessageStatement;
234    }
235
236    public void setAddMessageStatement(PreparedStatement addMessageStatement) {
237        this.addMessageStatement = addMessageStatement;
238    }
239
240    public PreparedStatement getUpdateLastAckStatement() {
241        return updateLastAckStatement;
242    }
243
244    public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) {
245        this.updateLastAckStatement = ackMessageStatement;
246    }
247
248    public PreparedStatement getRemovedMessageStatement() {
249        return removedMessageStatement;
250    }
251
252    public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) {
253        this.removedMessageStatement = removedMessageStatement;
254    }
255    
256    public void setTransactionIsolation(int transactionIsolation) {
257        this.transactionIsolation = transactionIsolation;
258    }
259
260    public void onCompletion(Runnable runnable) {
261        completions.add(runnable);
262    }
263}