001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.transaction;
018    
019    import java.io.IOException;
020    import javax.transaction.xa.XAException;
021    import javax.transaction.xa.XAResource;
022    import org.apache.activemq.broker.TransactionBroker;
023    import org.apache.activemq.command.ConnectionId;
024    import org.apache.activemq.command.TransactionId;
025    import org.apache.activemq.command.XATransactionId;
026    import org.apache.activemq.store.TransactionStore;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    /**
031     * 
032     */
033    public class XATransaction extends Transaction {
034    
035        private static final Logger LOG = LoggerFactory.getLogger(XATransaction.class);
036    
037        private final TransactionStore transactionStore;
038        private final XATransactionId xid;
039        private final TransactionBroker broker;
040        private final ConnectionId connectionId;
041    
042        public XATransaction(TransactionStore transactionStore, XATransactionId xid, TransactionBroker broker, ConnectionId connectionId) {
043            this.transactionStore = transactionStore;
044            this.xid = xid;
045            this.broker = broker;
046            this.connectionId = connectionId;
047            if (LOG.isDebugEnabled()) {
048                LOG.debug("XA Transaction new/begin : " + xid);
049            }
050        }
051    
052        @Override
053        public void commit(boolean onePhase) throws XAException, IOException {
054            if (LOG.isDebugEnabled()) {
055                LOG.debug("XA Transaction commit onePhase:" + onePhase + ", xid: " + xid);
056            }
057    
058            switch (getState()) {
059            case START_STATE:
060                // 1 phase commit, no work done.
061                checkForPreparedState(onePhase);
062                setStateFinished();
063                break;
064            case IN_USE_STATE:
065                // 1 phase commit, work done.
066                checkForPreparedState(onePhase);
067                doPrePrepare();
068                setStateFinished();
069                storeCommit(getTransactionId(), false, preCommitTask, postCommitTask);
070                break;
071            case PREPARED_STATE:
072                // 2 phase commit, work done.
073                // We would record commit here.
074                setStateFinished();
075                storeCommit(getTransactionId(), true, preCommitTask, postCommitTask);
076                break;
077            default:
078                illegalStateTransition("commit");
079            }
080        }
081    
082        private void storeCommit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit)
083                throws XAException, IOException {
084            try {
085                transactionStore.commit(getTransactionId(), wasPrepared, preCommitTask, postCommitTask);
086                waitPostCommitDone(postCommitTask);
087            } catch (XAException xae) {
088                throw xae;
089            } catch (Throwable t) {
090                LOG.warn("Store COMMIT FAILED: ", t);
091                rollback();
092                XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back.");
093                xae.errorCode = XAException.XA_RBOTHER;
094                xae.initCause(t);
095                throw xae;
096            }
097        }
098    
099        private void illegalStateTransition(String callName) throws XAException {
100            XAException xae = new XAException("Cannot call " + callName + " now.");
101            xae.errorCode = XAException.XAER_PROTO;
102            throw xae;
103        }
104    
105        private void checkForPreparedState(boolean onePhase) throws XAException {
106            if (!onePhase) {
107                XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared.");
108                xae.errorCode = XAException.XAER_PROTO;
109                throw xae;
110            }
111        }
112    
113        private void doPrePrepare() throws XAException, IOException {
114            try {
115                prePrepare();
116            } catch (XAException e) {
117                throw e;
118            } catch (Throwable e) {
119                LOG.warn("PRE-PREPARE FAILED: ", e);
120                rollback();
121                XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back.");
122                xae.errorCode = XAException.XA_RBOTHER;
123                xae.initCause(e);
124                throw xae;
125            }
126        }
127    
128        @Override
129        public void rollback() throws XAException, IOException {
130    
131            if (LOG.isDebugEnabled()) {
132                LOG.debug("XA Transaction rollback: " + xid);
133            }
134    
135            switch (getState()) {
136            case START_STATE:
137                // 1 phase rollback no work done.
138                setStateFinished();
139                break;
140            case IN_USE_STATE:
141                // 1 phase rollback work done.
142                setStateFinished();
143                transactionStore.rollback(getTransactionId());
144                doPostRollback();
145                break;
146            case PREPARED_STATE:
147                // 2 phase rollback work done.
148                setStateFinished();
149                transactionStore.rollback(getTransactionId());
150                doPostRollback();
151                break;
152            case FINISHED_STATE:
153                // failure to commit
154                transactionStore.rollback(getTransactionId());
155                doPostRollback();
156                break;
157            default:
158                throw new XAException("Invalid state");
159            }
160    
161        }
162    
163        private void doPostRollback() throws XAException {
164            try {
165                fireAfterRollback();
166            } catch (Throwable e) {
167                // I guess this could happen. Post commit task failed
168                // to execute properly.
169                LOG.warn("POST ROLLBACK FAILED: ", e);
170                XAException xae = new XAException("POST ROLLBACK FAILED");
171                xae.errorCode = XAException.XAER_RMERR;
172                xae.initCause(e);
173                throw xae;
174            }
175        }
176    
177        @Override
178        public int prepare() throws XAException, IOException {
179            if (LOG.isDebugEnabled()) {
180                LOG.debug("XA Transaction prepare: " + xid);
181            }
182    
183            switch (getState()) {
184            case START_STATE:
185                // No work done.. no commit/rollback needed.
186                setStateFinished();
187                return XAResource.XA_RDONLY;
188            case IN_USE_STATE:
189                // We would record prepare here.
190                doPrePrepare();
191                setState(Transaction.PREPARED_STATE);
192                transactionStore.prepare(getTransactionId());
193                return XAResource.XA_OK;
194            default:
195                illegalStateTransition("prepare");
196                return XAResource.XA_RDONLY;
197            }
198        }
199    
200        private void setStateFinished() {
201            setState(Transaction.FINISHED_STATE);
202            broker.removeTransaction(xid);
203        }
204    
205        public ConnectionId getConnectionId() {
206            return connectionId;
207        }
208    
209        @Override
210        public TransactionId getTransactionId() {
211            return xid;
212        }
213        
214        @Override
215        public Logger getLog() {
216            return LOG;
217        }
218    
219        public XATransactionId getXid() {
220            return xid;
221        }
222    }