001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.sql.Connection;
021    import java.sql.PreparedStatement;
022    import java.sql.SQLException;
023    import java.sql.Statement;
024    
025    import javax.sql.DataSource;
026    
027    import org.apache.activemq.util.IOExceptionSupport;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * Helps keep track of the current transaction/JDBC connection.
033     * 
034     * 
035     */
036    public class TransactionContext {
037    
038        private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
039    
040        private final DataSource dataSource;
041        private final JDBCPersistenceAdapter persistenceAdapter;
042        private Connection connection;
043        private boolean inTx;
044        private PreparedStatement addMessageStatement;
045        private PreparedStatement removedMessageStatement;
046        private PreparedStatement updateLastAckStatement;
047        // a cheap dirty level that we can live with    
048        private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
049        
050        public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
051            this.persistenceAdapter = persistenceAdapter;
052            this.dataSource = persistenceAdapter.getDataSource();
053        }
054    
055        public Connection getConnection() throws IOException {
056            if (connection == null) {
057                try {
058                    connection = dataSource.getConnection();
059                    boolean autoCommit = !inTx;
060                    if (connection.getAutoCommit() != autoCommit) {
061                        connection.setAutoCommit(autoCommit);
062                    }
063                } catch (SQLException e) {
064                    JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e);
065                    IOException ioe = IOExceptionSupport.create(e);
066                    persistenceAdapter.getBrokerService().handleIOException(ioe);
067                    throw ioe;
068    
069                }
070    
071                try {
072                    connection.setTransactionIsolation(transactionIsolation);
073                } catch (Throwable e) {
074                }
075            }
076            return connection;
077        }
078    
079        public void executeBatch() throws SQLException {
080            try {
081                executeBatch(addMessageStatement, "Failed add a message");
082            } finally {
083                addMessageStatement = null;
084                try {
085                    executeBatch(removedMessageStatement, "Failed to remove a message");
086                } finally {
087                    removedMessageStatement = null;
088                    try {
089                        executeBatch(updateLastAckStatement, "Failed to ack a message");
090                    } finally {
091                        updateLastAckStatement = null;
092                    }
093                }
094            }
095        }
096    
097        private void executeBatch(PreparedStatement p, String message) throws SQLException {
098            if (p == null) {
099                return;
100            }
101    
102            try {
103                int[] rc = p.executeBatch();
104                for (int i = 0; i < rc.length; i++) {
105                    int code = rc[i];
106                    if (code < 0 && code != Statement.SUCCESS_NO_INFO) {
107                        throw new SQLException(message + ". Response code: " + code);
108                    }
109                }
110            } finally {
111                try {
112                    p.close();
113                } catch (Throwable e) {
114                }
115            }
116        }
117    
118        public void close() throws IOException {
119            if (!inTx) {
120                try {
121    
122                    /**
123                     * we are not in a transaction so should not be committing ??
124                     * This was previously commented out - but had adverse affects
125                     * on testing - so it's back!
126                     * 
127                     */
128                    try {
129                        executeBatch();
130                    } finally {
131                        if (connection != null && !connection.getAutoCommit()) {
132                            connection.commit();
133                        }
134                    }
135    
136                } catch (SQLException e) {
137                    JDBCPersistenceAdapter.log("Error while closing connection: ", e);
138                    throw IOExceptionSupport.create(e);
139                } finally {
140                    try {
141                        if (connection != null) {
142                            connection.close();
143                        }
144                    } catch (Throwable e) {
145                        LOG.warn("Close failed: " + e.getMessage(), e);
146                    } finally {
147                        connection = null;
148                    }
149                }
150            }
151        }
152    
153        public void begin() throws IOException {
154            if (inTx) {
155                throw new IOException("Already started.");
156            }
157            inTx = true;
158            connection = getConnection();
159        }
160    
161        public void commit() throws IOException {
162            if (!inTx) {
163                throw new IOException("Not started.");
164            }
165            try {
166                executeBatch();
167                if (!connection.getAutoCommit()) {
168                    connection.commit();
169                }
170            } catch (SQLException e) {
171                JDBCPersistenceAdapter.log("Commit failed: ", e);
172                
173                this.rollback(); 
174                
175                throw IOExceptionSupport.create(e);
176            } finally {
177                inTx = false;
178                close();
179            }
180        }
181    
182        public void rollback() throws IOException {
183            if (!inTx) {
184                throw new IOException("Not started.");
185            }
186            try {
187                if (addMessageStatement != null) {
188                    addMessageStatement.close();
189                    addMessageStatement = null;
190                }
191                if (removedMessageStatement != null) {
192                    removedMessageStatement.close();
193                    removedMessageStatement = null;
194                }
195                if (updateLastAckStatement != null) {
196                    updateLastAckStatement.close();
197                    updateLastAckStatement = null;
198                }
199                connection.rollback();
200    
201            } catch (SQLException e) {
202                JDBCPersistenceAdapter.log("Rollback failed: ", e);
203                throw IOExceptionSupport.create(e);
204            } finally {
205                inTx = false;
206                close();
207            }
208        }
209    
210        public PreparedStatement getAddMessageStatement() {
211            return addMessageStatement;
212        }
213    
214        public void setAddMessageStatement(PreparedStatement addMessageStatement) {
215            this.addMessageStatement = addMessageStatement;
216        }
217    
218        public PreparedStatement getUpdateLastAckStatement() {
219            return updateLastAckStatement;
220        }
221    
222        public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) {
223            this.updateLastAckStatement = ackMessageStatement;
224        }
225    
226        public PreparedStatement getRemovedMessageStatement() {
227            return removedMessageStatement;
228        }
229    
230        public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) {
231            this.removedMessageStatement = removedMessageStatement;
232        }
233        
234        public void setTransactionIsolation(int transactionIsolation) {
235            this.transactionIsolation = transactionIsolation;
236        }
237    
238    }