001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.Connection;
021    import java.sql.PreparedStatement;
022    import java.sql.SQLException;
023    import java.sql.Statement;
024    
025    import javax.sql.DataSource;
026    
027    import org.apache.activemq.util.IOExceptionSupport;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * Helps keep track of the current transaction/JDBC connection.
033     * 
034     * 
035     */
036    public class TransactionContext {
037    
038        private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
039    
040        private final DataSource dataSource;
041        private final JDBCPersistenceAdapter persistenceAdapter;
042        private Connection connection;
043        private boolean inTx;
044        private PreparedStatement addMessageStatement;
045        private PreparedStatement removedMessageStatement;
046        private PreparedStatement updateLastAckStatement;
047        // a cheap dirty level that we can live with    
048        private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
049        
050        public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
051            this.persistenceAdapter = persistenceAdapter;
052            this.dataSource = persistenceAdapter.getDataSource();
053        }
054    
055        public Connection getConnection() throws IOException {
056            if (connection == null) {
057                try {
058                    connection = dataSource.getConnection();
059                    if (persistenceAdapter.isChangeAutoCommitAllowed()) {
060                        boolean autoCommit = !inTx;
061                        if (connection.getAutoCommit() != autoCommit) {
062                            LOG.trace("Setting auto commit to {} on connection {}", autoCommit, connection);
063                            connection.setAutoCommit(autoCommit);
064                        }
065                    }
066                } catch (SQLException e) {
067                    JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e);
068                    IOException ioe = IOExceptionSupport.create(e);
069                    persistenceAdapter.getBrokerService().handleIOException(ioe);
070                    throw ioe;
071    
072                }
073    
074                try {
075                    connection.setTransactionIsolation(transactionIsolation);
076                } catch (Throwable e) {
077                }
078            }
079            return connection;
080        }
081    
082        public void executeBatch() throws SQLException {
083            try {
084                executeBatch(addMessageStatement, "Failed add a message");
085            } finally {
086                addMessageStatement = null;
087                try {
088                    executeBatch(removedMessageStatement, "Failed to remove a message");
089                } finally {
090                    removedMessageStatement = null;
091                    try {
092                        executeBatch(updateLastAckStatement, "Failed to ack a message");
093                    } finally {
094                        updateLastAckStatement = null;
095                    }
096                }
097            }
098        }
099    
100        private void executeBatch(PreparedStatement p, String message) throws SQLException {
101            if (p == null) {
102                return;
103            }
104    
105            try {
106                int[] rc = p.executeBatch();
107                for (int i = 0; i < rc.length; i++) {
108                    int code = rc[i];
109                    if (code < 0 && code != Statement.SUCCESS_NO_INFO) {
110                        throw new SQLException(message + ". Response code: " + code);
111                    }
112                }
113            } finally {
114                try {
115                    p.close();
116                } catch (Throwable e) {
117                }
118            }
119        }
120    
121        public void close() throws IOException {
122            if (!inTx) {
123                try {
124    
125                    /**
126                     * we are not in a transaction so should not be committing ??
127                     * This was previously commented out - but had adverse affects
128                     * on testing - so it's back!
129                     * 
130                     */
131                    try {
132                        executeBatch();
133                    } finally {
134                        if (connection != null && !connection.getAutoCommit()) {
135                            connection.commit();
136                        }
137                    }
138    
139                } catch (SQLException e) {
140                    JDBCPersistenceAdapter.log("Error while closing connection: ", e);
141                    throw IOExceptionSupport.create(e);
142                } finally {
143                    try {
144                        if (connection != null) {
145                            connection.close();
146                        }
147                    } catch (Throwable e) {
148                        LOG.warn("Close failed: " + e.getMessage(), e);
149                    } finally {
150                        connection = null;
151                    }
152                }
153            }
154        }
155    
156        public void begin() throws IOException {
157            if (inTx) {
158                throw new IOException("Already started.");
159            }
160            inTx = true;
161            connection = getConnection();
162        }
163    
164        public void commit() throws IOException {
165            if (!inTx) {
166                throw new IOException("Not started.");
167            }
168            try {
169                executeBatch();
170                if (!connection.getAutoCommit()) {
171                    connection.commit();
172                }
173            } catch (SQLException e) {
174                JDBCPersistenceAdapter.log("Commit failed: ", e);
175                
176                this.rollback(); 
177                
178                throw IOExceptionSupport.create(e);
179            } finally {
180                inTx = false;
181                close();
182            }
183        }
184    
185        public void rollback() throws IOException {
186            if (!inTx) {
187                throw new IOException("Not started.");
188            }
189            try {
190                if (addMessageStatement != null) {
191                    addMessageStatement.close();
192                    addMessageStatement = null;
193                }
194                if (removedMessageStatement != null) {
195                    removedMessageStatement.close();
196                    removedMessageStatement = null;
197                }
198                if (updateLastAckStatement != null) {
199                    updateLastAckStatement.close();
200                    updateLastAckStatement = null;
201                }
202                connection.rollback();
203    
204            } catch (SQLException e) {
205                JDBCPersistenceAdapter.log("Rollback failed: ", e);
206                throw IOExceptionSupport.create(e);
207            } finally {
208                inTx = false;
209                close();
210            }
211        }
212    
213        public PreparedStatement getAddMessageStatement() {
214            return addMessageStatement;
215        }
216    
217        public void setAddMessageStatement(PreparedStatement addMessageStatement) {
218            this.addMessageStatement = addMessageStatement;
219        }
220    
221        public PreparedStatement getUpdateLastAckStatement() {
222            return updateLastAckStatement;
223        }
224    
225        public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) {
226            this.updateLastAckStatement = ackMessageStatement;
227        }
228    
229        public PreparedStatement getRemovedMessageStatement() {
230            return removedMessageStatement;
231        }
232    
233        public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) {
234            this.removedMessageStatement = removedMessageStatement;
235        }
236        
237        public void setTransactionIsolation(int transactionIsolation) {
238            this.transactionIsolation = transactionIsolation;
239        }
240    
241    }