001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.sql.*;
021import java.util.LinkedList;
022import java.util.Map;
023import java.util.Properties;
024import java.util.concurrent.Executor;
025import java.util.concurrent.locks.Lock;
026import java.util.concurrent.locks.ReentrantReadWriteLock;
027
028import javax.sql.DataSource;
029
030import org.apache.activemq.util.IOExceptionSupport;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Helps keep track of the current transaction/JDBC connection.
036 */
037public class TransactionContext {
038
039    private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
040
041    private final DataSource dataSource;
042    private final JDBCPersistenceAdapter persistenceAdapter;
043    private Connection connection;
044    private boolean inTx;
045    private PreparedStatement addMessageStatement;
046    private PreparedStatement removedMessageStatement;
047    private PreparedStatement updateLastAckStatement;
048    // a cheap dirty level that we can live with    
049    private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
050    private LinkedList<Runnable> completions = new LinkedList<Runnable>();
051    private ReentrantReadWriteLock exclusiveConnectionLock = new ReentrantReadWriteLock();
052
053    public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
054        this.persistenceAdapter = persistenceAdapter;
055        this.dataSource = persistenceAdapter.getDataSource();
056    }
057
058    public Connection getExclusiveConnection() throws IOException {
059        return lockAndWrapped(exclusiveConnectionLock.writeLock());
060    }
061
062    public Connection getConnection() throws IOException {
063        return lockAndWrapped(exclusiveConnectionLock.readLock());
064    }
065
066    private Connection lockAndWrapped(Lock toLock) throws IOException {
067        if (connection == null) {
068            toLock.lock();
069            try {
070                connection = dataSource.getConnection();
071                if (persistenceAdapter.isChangeAutoCommitAllowed()) {
072                    boolean autoCommit = !inTx;
073                    if (connection.getAutoCommit() != autoCommit) {
074                        LOG.trace("Setting auto commit to {} on connection {}", autoCommit, connection);
075                        connection.setAutoCommit(autoCommit);
076                    }
077                }
078                connection = new UnlockOnCloseConnection(connection, toLock);
079            } catch (SQLException e) {
080                JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e);
081                inTx = false;
082                try {
083                    toLock.unlock();
084                } catch (IllegalMonitorStateException oops) {
085                    LOG.error("Thread does not hold the context lock on close of:"  + connection, oops);
086                }
087                close();
088                IOException ioe = IOExceptionSupport.create(e);
089                if (persistenceAdapter.getBrokerService() != null) {
090                    persistenceAdapter.getBrokerService().handleIOException(ioe);
091                }
092                throw ioe;
093            }
094
095            try {
096                connection.setTransactionIsolation(transactionIsolation);
097            } catch (Throwable e) {
098                // ignore
099                LOG.trace("Cannot set transaction isolation to " + transactionIsolation + " due " + e.getMessage()
100                        + ". This exception is ignored.", e);
101            }
102        }
103        return connection;
104    }
105
106    public void executeBatch() throws SQLException {
107        try {
108            executeBatch(addMessageStatement, "Failed add a message");
109        } finally {
110            addMessageStatement = null;
111            try {
112                executeBatch(removedMessageStatement, "Failed to remove a message");
113            } finally {
114                removedMessageStatement = null;
115                try {
116                    executeBatch(updateLastAckStatement, "Failed to ack a message");
117                } finally {
118                    updateLastAckStatement = null;
119                }
120            }
121        }
122    }
123
124    private void executeBatch(PreparedStatement p, String message) throws SQLException {
125        if (p == null) {
126            return;
127        }
128
129        try {
130            int[] rc = p.executeBatch();
131            for (int i = 0; i < rc.length; i++) {
132                int code = rc[i];
133                if (code < 0 && code != Statement.SUCCESS_NO_INFO) {
134                    throw new SQLException(message + ". Response code: " + code);
135                }
136            }
137        } finally {
138            try {
139                p.close();
140            } catch (Throwable e) {
141            }
142        }
143    }
144
145    public void close() throws IOException {
146        if (!inTx) {
147            try {
148
149                /**
150                 * we are not in a transaction so should not be committing ??
151                 * This was previously commented out - but had adverse affects
152                 * on testing - so it's back!
153                 * 
154                 */
155                try {
156                    executeBatch();
157                } finally {
158                    if (connection != null && !connection.getAutoCommit()) {
159                        connection.commit();
160                    }
161                }
162
163            } catch (SQLException e) {
164                JDBCPersistenceAdapter.log("Error while closing connection: ", e);
165                IOException ioe = IOExceptionSupport.create(e);
166                persistenceAdapter.getBrokerService().handleIOException(ioe);
167                throw ioe;
168            } finally {
169                try {
170                    if (connection != null) {
171                        connection.close();
172                    }
173                } catch (Throwable e) {
174                    // ignore
175                    LOG.trace("Closing connection failed due: " + e.getMessage() + ". This exception is ignored.", e);
176                } finally {
177                    connection = null;
178                }
179                for (Runnable completion: completions) {
180                    completion.run();
181                }
182                completions.clear();
183            }
184        }
185    }
186
187    public void begin() throws IOException {
188        if (inTx) {
189            throw new IOException("Already started.");
190        }
191        inTx = true;
192        connection = getConnection();
193    }
194
195    public void commit() throws IOException {
196        if (!inTx) {
197            throw new IOException("Not started.");
198        }
199        try {
200            executeBatch();
201            if (!connection.getAutoCommit()) {
202                connection.commit();
203            }
204        } catch (SQLException e) {
205            JDBCPersistenceAdapter.log("Commit failed: ", e);
206            try {
207                doRollback();
208            } catch (Exception ignored) {}
209            IOException ioe = IOExceptionSupport.create(e);
210            persistenceAdapter.getBrokerService().handleIOException(ioe);
211            throw ioe;
212        } finally {
213            inTx = false;
214            close();
215        }
216    }
217
218    public void rollback() throws IOException {
219        if (!inTx) {
220            throw new IOException("Not started.");
221        }
222        try {
223            doRollback();
224        } catch (SQLException e) {
225            JDBCPersistenceAdapter.log("Rollback failed: ", e);
226            throw IOExceptionSupport.create(e);
227        } finally {
228            inTx = false;
229            close();
230        }
231    }
232
233    private void doRollback() throws SQLException {
234        if (addMessageStatement != null) {
235            addMessageStatement.close();
236            addMessageStatement = null;
237        }
238        if (removedMessageStatement != null) {
239            removedMessageStatement.close();
240            removedMessageStatement = null;
241        }
242        if (updateLastAckStatement != null) {
243            updateLastAckStatement.close();
244            updateLastAckStatement = null;
245        }
246        connection.rollback();
247    }
248
249    public PreparedStatement getAddMessageStatement() {
250        return addMessageStatement;
251    }
252
253    public void setAddMessageStatement(PreparedStatement addMessageStatement) {
254        this.addMessageStatement = addMessageStatement;
255    }
256
257    public PreparedStatement getUpdateLastAckStatement() {
258        return updateLastAckStatement;
259    }
260
261    public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) {
262        this.updateLastAckStatement = ackMessageStatement;
263    }
264
265    public PreparedStatement getRemovedMessageStatement() {
266        return removedMessageStatement;
267    }
268
269    public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) {
270        this.removedMessageStatement = removedMessageStatement;
271    }
272    
273    public void setTransactionIsolation(int transactionIsolation) {
274        this.transactionIsolation = transactionIsolation;
275    }
276
277    public void onCompletion(Runnable runnable) {
278        completions.add(runnable);
279    }
280
281    final private class UnlockOnCloseConnection implements Connection {
282
283        private final Connection delegate;
284        private final Lock lock;
285
286        UnlockOnCloseConnection(Connection delegate, Lock toUnlockOnClose) {
287            this.delegate = delegate;
288            this.lock = toUnlockOnClose;
289        }
290
291        @Override
292        public void close() throws SQLException {
293            try {
294                delegate.close();
295            } finally {
296                lock.unlock();
297            }
298        }
299
300        // simple delegate for the  rest of the impl..
301        @Override
302        public Statement createStatement() throws SQLException {
303            return delegate.createStatement();
304        }
305
306        @Override
307        public PreparedStatement prepareStatement(String sql) throws SQLException {
308            return delegate.prepareStatement(sql);
309        }
310
311        @Override
312        public CallableStatement prepareCall(String sql) throws SQLException {
313            return delegate.prepareCall(sql);
314        }
315
316        @Override
317        public String nativeSQL(String sql) throws SQLException {
318            return delegate.nativeSQL(sql);
319        }
320
321        @Override
322        public void setAutoCommit(boolean autoCommit) throws SQLException {
323            delegate.setAutoCommit(autoCommit);
324        }
325
326        @Override
327        public boolean getAutoCommit() throws SQLException {
328            return delegate.getAutoCommit();
329        }
330
331        @Override
332        public void commit() throws SQLException {
333            delegate.commit();
334        }
335
336        @Override
337        public void rollback() throws SQLException {
338            delegate.rollback();
339        }
340
341        @Override
342        public boolean isClosed() throws SQLException {
343            return delegate.isClosed();
344        }
345
346        @Override
347        public DatabaseMetaData getMetaData() throws SQLException {
348            return delegate.getMetaData();
349        }
350
351        @Override
352        public void setReadOnly(boolean readOnly) throws SQLException {
353            delegate.setReadOnly(readOnly);
354        }
355
356        @Override
357        public boolean isReadOnly() throws SQLException {
358            return delegate.isReadOnly();
359        }
360
361        @Override
362        public void setCatalog(String catalog) throws SQLException {
363            delegate.setCatalog(catalog);
364        }
365
366        @Override
367        public String getCatalog() throws SQLException {
368            return delegate.getCatalog();
369        }
370
371        @Override
372        public void setTransactionIsolation(int level) throws SQLException {
373            delegate.setTransactionIsolation(level);
374        }
375
376        @Override
377        public int getTransactionIsolation() throws SQLException {
378            return delegate.getTransactionIsolation();
379        }
380
381        @Override
382        public SQLWarning getWarnings() throws SQLException {
383            return delegate.getWarnings();
384        }
385
386        @Override
387        public void clearWarnings() throws SQLException {
388            delegate.clearWarnings();
389        }
390
391        @Override
392        public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
393            return delegate.createStatement(resultSetType, resultSetConcurrency);
394        }
395
396        @Override
397        public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
398            return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency);
399        }
400
401        @Override
402        public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
403            return delegate.prepareCall(sql, resultSetType, resultSetConcurrency);
404        }
405
406        @Override
407        public Map<String, Class<?>> getTypeMap() throws SQLException {
408            return delegate.getTypeMap();
409        }
410
411        @Override
412        public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
413            delegate.setTypeMap(map);
414        }
415
416        @Override
417        public void setHoldability(int holdability) throws SQLException {
418            delegate.setHoldability(holdability);
419        }
420
421        @Override
422        public int getHoldability() throws SQLException {
423            return delegate.getHoldability();
424        }
425
426        @Override
427        public Savepoint setSavepoint() throws SQLException {
428            return delegate.setSavepoint();
429        }
430
431        @Override
432        public Savepoint setSavepoint(String name) throws SQLException {
433            return delegate.setSavepoint(name);
434        }
435
436        @Override
437        public void rollback(Savepoint savepoint) throws SQLException {
438            delegate.rollback(savepoint);
439        }
440
441        @Override
442        public void releaseSavepoint(Savepoint savepoint) throws SQLException {
443            delegate.releaseSavepoint(savepoint);
444        }
445
446        @Override
447        public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
448            return delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
449        }
450
451        @Override
452        public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
453            return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
454        }
455
456        @Override
457        public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
458            return delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
459        }
460
461        @Override
462        public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
463            return delegate.prepareStatement(sql, autoGeneratedKeys);
464        }
465
466        @Override
467        public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
468            return delegate.prepareStatement(sql, columnIndexes);
469        }
470
471        @Override
472        public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
473            return delegate.prepareStatement(sql, columnNames);
474        }
475
476        @Override
477        public Clob createClob() throws SQLException {
478            return delegate.createClob();
479        }
480
481        @Override
482        public Blob createBlob() throws SQLException {
483            return delegate.createBlob();
484        }
485
486        @Override
487        public NClob createNClob() throws SQLException {
488            return delegate.createNClob();
489        }
490
491        @Override
492        public SQLXML createSQLXML() throws SQLException {
493            return delegate.createSQLXML();
494        }
495
496        @Override
497        public boolean isValid(int timeout) throws SQLException {
498            return delegate.isValid(timeout);
499        }
500
501        @Override
502        public void setClientInfo(String name, String value) throws SQLClientInfoException {
503            delegate.setClientInfo(name, value);
504        }
505
506        @Override
507        public void setClientInfo(Properties properties) throws SQLClientInfoException {
508            delegate.setClientInfo(properties);
509        }
510
511        @Override
512        public String getClientInfo(String name) throws SQLException {
513            return delegate.getClientInfo(name);
514        }
515
516        @Override
517        public Properties getClientInfo() throws SQLException {
518            return delegate.getClientInfo();
519        }
520
521        @Override
522        public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
523            return delegate.createArrayOf(typeName, elements);
524        }
525
526        @Override
527        public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
528            return delegate.createStruct(typeName, attributes);
529        }
530
531        @Override
532        public void setSchema(String schema) throws SQLException {
533            delegate.setSchema(schema);
534        }
535
536        @Override
537        public String getSchema() throws SQLException {
538            return delegate.getSchema();
539        }
540
541        @Override
542        public void abort(Executor executor) throws SQLException {
543            delegate.abort(executor);
544        }
545
546        @Override
547        public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
548            delegate.setNetworkTimeout(executor, milliseconds);
549        }
550
551        @Override
552        public int getNetworkTimeout() throws SQLException {
553            return delegate.getNetworkTimeout();
554        }
555
556        @Override
557        public <T> T unwrap(Class<T> iface) throws SQLException {
558            return delegate.unwrap(iface);
559        }
560
561        @Override
562        public boolean isWrapperFor(Class<?> iface) throws SQLException {
563            return delegate.isWrapperFor(iface);
564        }
565    }
566}