001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.util.ArrayList;
020import java.util.Arrays;
021import java.util.HashMap;
022import java.util.List;
023
024import javax.jms.JMSException;
025import javax.jms.TransactionInProgressException;
026import javax.jms.TransactionRolledBackException;
027import javax.transaction.xa.XAException;
028import javax.transaction.xa.XAResource;
029import javax.transaction.xa.Xid;
030
031import org.apache.activemq.command.ConnectionId;
032import org.apache.activemq.command.DataArrayResponse;
033import org.apache.activemq.command.DataStructure;
034import org.apache.activemq.command.IntegerResponse;
035import org.apache.activemq.command.LocalTransactionId;
036import org.apache.activemq.command.TransactionId;
037import org.apache.activemq.command.TransactionInfo;
038import org.apache.activemq.command.XATransactionId;
039import org.apache.activemq.transaction.Synchronization;
040import org.apache.activemq.util.JMSExceptionSupport;
041import org.apache.activemq.util.LongSequenceGenerator;
042import org.apache.activemq.util.XASupport;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * A TransactionContext provides the means to control a JMS transaction. It
048 * provides a local transaction interface and also an XAResource interface. <p/>
049 * An application server controls the transactional assignment of an XASession
050 * by obtaining its XAResource. It uses the XAResource to assign the session to
051 * a transaction, prepare and commit work on the transaction, and so on. <p/> An
052 * XAResource provides some fairly sophisticated facilities for interleaving
053 * work on multiple transactions, recovering a list of transactions in progress,
054 * and so on. A JTA aware JMS provider must fully implement this functionality.
055 * This could be done by using the services of a database that supports XA, or a
056 * JMS provider may choose to implement this functionality from scratch. <p/>
057 *
058 *
059 * @see javax.jms.Session
060 * @see javax.jms.QueueSession
061 * @see javax.jms.TopicSession
062 * @see javax.jms.XASession
063 */
064public class TransactionContext implements XAResource {
065
066    public static final String xaErrorCodeMarker = "xaErrorCode:";
067    private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
068
069    // XATransactionId -> ArrayList of TransactionContext objects
070    private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS =
071            new HashMap<TransactionId, List<TransactionContext>>();
072
073    private ActiveMQConnection connection;
074    private final LongSequenceGenerator localTransactionIdGenerator;
075    private List<Synchronization> synchronizations;
076
077    // To track XA transactions.
078    private Xid associatedXid;
079    private TransactionId transactionId;
080    private LocalTransactionEventListener localTransactionEventListener;
081    private int beforeEndIndex;
082    private volatile boolean rollbackOnly;
083
084    // for RAR recovery
085    public TransactionContext() {
086        localTransactionIdGenerator = null;
087    }
088
089    public TransactionContext(ActiveMQConnection connection) {
090        this.connection = connection;
091        this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
092    }
093
094    public boolean isInXATransaction() {
095        if (transactionId != null && transactionId.isXATransaction()) {
096            return true;
097        } else {
098            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
099                for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) {
100                      if (transactions.contains(this)) {
101                          return true;
102                      }
103                }
104            }
105        }
106
107        return false;
108    }
109
110    public void setRollbackOnly(boolean val) {
111        rollbackOnly = val;
112    }
113
114    public boolean isInLocalTransaction() {
115        return transactionId != null && transactionId.isLocalTransaction();
116    }
117
118    public boolean isInTransaction() {
119        return transactionId != null;
120    }
121
122    /**
123     * @return Returns the localTransactionEventListener.
124     */
125    public LocalTransactionEventListener getLocalTransactionEventListener() {
126        return localTransactionEventListener;
127    }
128
129    /**
130     * Used by the resource adapter to listen to transaction events.
131     *
132     * @param localTransactionEventListener The localTransactionEventListener to
133     *                set.
134     */
135    public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
136        this.localTransactionEventListener = localTransactionEventListener;
137    }
138
139    // ///////////////////////////////////////////////////////////
140    //
141    // Methods that work with the Synchronization objects registered with
142    // the transaction.
143    //
144    // ///////////////////////////////////////////////////////////
145
146    public void addSynchronization(Synchronization s) {
147        if (synchronizations == null) {
148            synchronizations = new ArrayList<Synchronization>(10);
149        }
150        synchronizations.add(s);
151    }
152
153    private void afterRollback() throws JMSException {
154        if (synchronizations == null) {
155            return;
156        }
157
158        Throwable firstException = null;
159        int size = synchronizations.size();
160        for (int i = 0; i < size; i++) {
161            try {
162                synchronizations.get(i).afterRollback();
163            } catch (Throwable t) {
164                LOG.debug("Exception from afterRollback on {}", synchronizations.get(i), t);
165                if (firstException == null) {
166                    firstException = t;
167                }
168            }
169        }
170        synchronizations = null;
171        if (firstException != null) {
172            throw JMSExceptionSupport.create(firstException);
173        }
174    }
175
176    private void afterCommit() throws JMSException {
177        if (synchronizations == null) {
178            return;
179        }
180
181        Throwable firstException = null;
182        int size = synchronizations.size();
183        for (int i = 0; i < size; i++) {
184            try {
185                synchronizations.get(i).afterCommit();
186            } catch (Throwable t) {
187                LOG.debug("Exception from afterCommit on {}", synchronizations.get(i), t);
188                if (firstException == null) {
189                    firstException = t;
190                }
191            }
192        }
193        synchronizations = null;
194        if (firstException != null) {
195            throw JMSExceptionSupport.create(firstException);
196        }
197    }
198
199    private void beforeEnd() throws JMSException {
200        if (synchronizations == null) {
201            return;
202        }
203
204        int size = synchronizations.size();
205        try {
206            for (;beforeEndIndex < size;) {
207                synchronizations.get(beforeEndIndex++).beforeEnd();
208            }
209        } catch (JMSException e) {
210            throw e;
211        } catch (Throwable e) {
212            throw JMSExceptionSupport.create(e);
213        }
214    }
215
216    public TransactionId getTransactionId() {
217        return transactionId;
218    }
219
220    // ///////////////////////////////////////////////////////////
221    //
222    // Local transaction interface.
223    //
224    // ///////////////////////////////////////////////////////////
225
226    /**
227     * Start a local transaction.
228     * @throws javax.jms.JMSException on internal error
229     */
230    public void begin() throws JMSException {
231
232        if (isInXATransaction()) {
233            throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
234        }
235
236        if (transactionId == null) {
237            synchronizations = null;
238            beforeEndIndex = 0;
239            setRollbackOnly(false);
240            this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId());
241            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
242            this.connection.ensureConnectionInfoSent();
243            this.connection.asyncSendPacket(info);
244
245            // Notify the listener that the tx was started.
246            if (localTransactionEventListener != null) {
247                localTransactionEventListener.beginEvent();
248            }
249
250            LOG.debug("Begin:{}", transactionId);
251        }
252    }
253
254    /**
255     * Rolls back any work done in this transaction and releases any locks
256     * currently held.
257     *
258     * @throws JMSException if the JMS provider fails to roll back the
259     *                 transaction due to some internal error.
260     * @throws javax.jms.IllegalStateException if the method is not called by a
261     *                 transacted session.
262     */
263    public void rollback() throws JMSException {
264        if (isInXATransaction()) {
265            throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
266        }
267
268        try {
269            beforeEnd();
270        } catch (TransactionRolledBackException canOcurrOnFailover) {
271            LOG.warn("rollback processing error", canOcurrOnFailover);
272        }
273        if (transactionId != null) {
274            LOG.debug("Rollback: {} syncCount: {}",
275                transactionId, (synchronizations != null ? synchronizations.size() : 0));
276
277            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
278            this.transactionId = null;
279            //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
280            this.connection.syncSendPacket(info);
281            // Notify the listener that the tx was rolled back
282            if (localTransactionEventListener != null) {
283                localTransactionEventListener.rollbackEvent();
284            }
285        }
286
287        afterRollback();
288    }
289
290    /**
291     * Commits all work done in this transaction and releases any locks
292     * currently held.
293     *
294     * @throws JMSException if the JMS provider fails to commit the transaction
295     *                 due to some internal error.
296     * @throws javax.jms.IllegalStateException if the method is not called by a
297     *                 transacted session.
298     */
299    public void commit() throws JMSException {
300        if (isInXATransaction()) {
301            throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
302        }
303
304        try {
305            beforeEnd();
306        } catch (JMSException e) {
307            rollback();
308            throw e;
309        }
310
311        if (transactionId != null && rollbackOnly) {
312            final String message = "Commit of " + transactionId + "  failed due to rollback only request; typically due to failover with pending acks";
313            try {
314                rollback();
315            } finally {
316                LOG.warn(message);
317                throw new TransactionRolledBackException(message);
318            }
319        }
320
321        // Only send commit if the transaction was started.
322        if (transactionId != null) {
323            LOG.debug("Commit: {} syncCount: {}",
324                transactionId, (synchronizations != null ? synchronizations.size() : 0));
325
326            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
327            this.transactionId = null;
328            // Notify the listener that the tx was committed back
329            try {
330                this.connection.syncSendPacket(info);
331                if (localTransactionEventListener != null) {
332                    localTransactionEventListener.commitEvent();
333                }
334                afterCommit();
335            } catch (JMSException cause) {
336                LOG.info("commit failed for transaction {}", info.getTransactionId(), cause);
337                if (localTransactionEventListener != null) {
338                    localTransactionEventListener.rollbackEvent();
339                }
340                afterRollback();
341                throw cause;
342            }
343
344        }
345    }
346
347    // ///////////////////////////////////////////////////////////
348    //
349    // XAResource Implementation
350    //
351    // ///////////////////////////////////////////////////////////
352    /**
353     * Associates a transaction with the resource.
354     */
355    @Override
356    public void start(Xid xid, int flags) throws XAException {
357
358        LOG.debug("Start: {}, flags: {}", xid, XASupport.toString(flags));
359
360        if (isInLocalTransaction()) {
361            throw new XAException(XAException.XAER_PROTO);
362        }
363        // Are we already associated?
364        if (associatedXid != null) {
365            throw new XAException(XAException.XAER_PROTO);
366        }
367
368        // if ((flags & TMJOIN) == TMJOIN) {
369        // TODO: verify that the server has seen the xid
370        // // }
371        // if ((flags & TMRESUME) == TMRESUME) {
372        // // TODO: verify that the xid was suspended.
373        // }
374
375        // associate
376        synchronizations = null;
377        beforeEndIndex = 0;
378        setRollbackOnly(false);
379        setXid(xid);
380    }
381
382    /**
383     * @return connectionId for connection
384     */
385    private ConnectionId getConnectionId() {
386        return connection.getConnectionInfo().getConnectionId();
387    }
388
389    @Override
390    public void end(Xid xid, int flags) throws XAException {
391
392        LOG.debug("End: {}, flags: {}", xid, XASupport.toString(flags));
393
394        if (isInLocalTransaction()) {
395            throw new XAException(XAException.XAER_PROTO);
396        }
397
398        if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
399            // You can only suspend the associated xid.
400            if (!equals(associatedXid, xid)) {
401                throw new XAException(XAException.XAER_PROTO);
402            }
403            invokeBeforeEnd();
404        } else if ((flags & TMSUCCESS) == TMSUCCESS) {
405            // set to null if this is the current xid.
406            // otherwise this could be an asynchronous success call
407            if (equals(associatedXid, xid)) {
408                invokeBeforeEnd();
409            }
410        } else {
411            throw new XAException(XAException.XAER_INVAL);
412        }
413    }
414
415    private void invokeBeforeEnd() throws XAException {
416        boolean throwingException = false;
417        try {
418            beforeEnd();
419        } catch (JMSException e) {
420            throwingException = true;
421            throw toXAException(e);
422        } finally {
423            try {
424                setXid(null);
425            } catch (XAException ignoreIfWillMask){
426                if (!throwingException) {
427                    throw ignoreIfWillMask;
428                }
429            }
430        }
431    }
432
433    private boolean equals(Xid xid1, Xid xid2) {
434        if (xid1 == xid2) {
435            return true;
436        }
437        if (xid1 == null ^ xid2 == null) {
438            return false;
439        }
440        return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
441               && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
442    }
443
444    @Override
445    public int prepare(Xid xid) throws XAException {
446        LOG.debug("Prepare: {}", xid);
447
448        // We allow interleaving multiple transactions, so
449        // we don't limit prepare to the associated xid.
450        XATransactionId x;
451        // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
452        // called first
453        if (xid == null || (equals(associatedXid, xid))) {
454            throw new XAException(XAException.XAER_PROTO);
455        } else {
456            // TODO: cache the known xids so we don't keep recreating this one??
457            x = new XATransactionId(xid);
458        }
459
460        if (rollbackOnly) {
461            LOG.warn("prepare of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks");
462            throw new XAException(XAException.XA_RBINTEGRITY);
463        }
464
465        try {
466            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
467
468            // Find out if the server wants to commit or rollback.
469            IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info);
470            if (XAResource.XA_RDONLY == response.getResult()) {
471                // transaction stops now, may be syncs that need a callback
472                List<TransactionContext> l;
473                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
474                    l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
475                }
476                // After commit may be expensive and can deadlock, do it outside global synch block
477                // No risk for concurrent updates as we own the list now
478                if (l != null) {
479                    if(! l.isEmpty()) {
480                        LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid);
481                        for (TransactionContext ctx : l) {
482                            ctx.afterCommit();
483                        }
484                    }
485                }
486            }
487            return response.getResult();
488
489        } catch (JMSException e) {
490            LOG.warn("prepare of: " + x + " failed with: " + e, e);
491            List<TransactionContext> l;
492            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
493                l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
494            }
495            // After rollback may be expensive and can deadlock, do it outside global synch block
496            // No risk for concurrent updates as we own the list now
497            if (l != null) {
498                for (TransactionContext ctx : l) {
499                    try {
500                        ctx.afterRollback();
501                    } catch (Throwable ignored) {
502                        LOG.debug("failed to firing afterRollback callbacks on prepare " +
503                                  "failure, txid: {}, context: {}", x, ctx, ignored);
504                    }
505                }
506            }
507            throw toXAException(e);
508        }
509    }
510
511    @Override
512    public void rollback(Xid xid) throws XAException {
513
514        if (LOG.isDebugEnabled()) {
515            LOG.debug("Rollback: " + xid);
516        }
517
518        // We allow interleaving multiple transactions, so
519        // we don't limit rollback to the associated xid.
520        XATransactionId x;
521        if (xid == null) {
522            throw new XAException(XAException.XAER_PROTO);
523        }
524        if (equals(associatedXid, xid)) {
525            // I think this can happen even without an end(xid) call. Need to
526            // check spec.
527            x = (XATransactionId)transactionId;
528        } else {
529            x = new XATransactionId(xid);
530        }
531
532        try {
533            this.connection.checkClosedOrFailed();
534            this.connection.ensureConnectionInfoSent();
535
536            // Let the server know that the tx is rollback.
537            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
538            this.connection.syncSendPacket(info);
539
540            List<TransactionContext> l;
541            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
542                l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
543            }
544            // After rollback may be expensive and can deadlock, do it outside global synch block
545            // No risk for concurrent updates as we own the list now
546            if (l != null) {
547                for (TransactionContext ctx : l) {
548                    ctx.afterRollback();
549                }                  
550            }
551        } catch (JMSException e) {
552            throw toXAException(e);
553        }
554    }
555
556    // XAResource interface
557    @Override
558    public void commit(Xid xid, boolean onePhase) throws XAException {
559
560        LOG.debug("Commit: {}, onePhase={}", xid, onePhase);
561
562        // We allow interleaving multiple transactions, so
563        // we don't limit commit to the associated xid.
564        XATransactionId x;
565        if (xid == null || (equals(associatedXid, xid))) {
566            // should never happen, end(xid,TMSUCCESS) must have been previously
567            // called
568            throw new XAException(XAException.XAER_PROTO);
569        } else {
570            x = new XATransactionId(xid);
571        }
572
573        if (rollbackOnly) {
574             LOG.warn("commit of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks");
575             throw new XAException(XAException.XA_RBINTEGRITY);
576         }
577
578        try {
579            this.connection.checkClosedOrFailed();
580            this.connection.ensureConnectionInfoSent();
581
582            // Notify the server that the tx was committed back
583            TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
584
585            this.connection.syncSendPacket(info);
586
587            List<TransactionContext> l;
588            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
589                l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
590            }
591            // After commit may be expensive and can deadlock, do it outside global synch block
592            // No risk for concurrent updates as we own the list now
593            if (l != null) {
594                for (TransactionContext ctx : l) {
595                    try {
596                        ctx.afterCommit();
597                    } catch (Exception ignored) {
598                        LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored);
599                    }
600                }
601            }
602
603        } catch (JMSException e) {
604            LOG.warn("commit of: " + x + " failed with: " + e, e);
605            if (onePhase) {
606                List<TransactionContext> l;
607                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
608                    l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
609                }
610                // After rollback may be expensive and can deadlock, do it outside global synch block
611                // No risk for concurrent updates as we own the list now
612                if (l != null) {
613                    for (TransactionContext ctx : l) {
614                        try {
615                            ctx.afterRollback();
616                        } catch (Throwable ignored) {
617                            LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored);
618                        }
619                    }
620                }
621            }
622            throw toXAException(e);
623        }
624    }
625
626    @Override
627    public void forget(Xid xid) throws XAException {
628        LOG.debug("Forget: {}", xid);
629
630        // We allow interleaving multiple transactions, so
631        // we don't limit forget to the associated xid.
632        XATransactionId x;
633        if (xid == null) {
634            throw new XAException(XAException.XAER_PROTO);
635        }
636        if (equals(associatedXid, xid)) {
637            // TODO determine if this can happen... I think not.
638            x = (XATransactionId)transactionId;
639        } else {
640            x = new XATransactionId(xid);
641        }
642
643        TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
644
645        try {
646            // Tell the server to forget the transaction.
647            this.connection.syncSendPacket(info);
648        } catch (JMSException e) {
649            throw toXAException(e);
650        }
651        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
652            ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
653        }
654    }
655
656    @Override
657    public boolean isSameRM(XAResource xaResource) throws XAException {
658        if (xaResource == null) {
659            return false;
660        }
661        if (!(xaResource instanceof TransactionContext)) {
662            return false;
663        }
664        TransactionContext xar = (TransactionContext)xaResource;
665        try {
666            return getResourceManagerId().equals(xar.getResourceManagerId());
667        } catch (Throwable e) {
668            throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
669        }
670    }
671
672    @Override
673    public Xid[] recover(int flag) throws XAException {
674        LOG.debug("recover({})", flag);
675
676        TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
677        try {
678            this.connection.checkClosedOrFailed();
679            this.connection.ensureConnectionInfoSent();
680
681            DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
682            DataStructure[] data = receipt.getData();
683            XATransactionId[] answer;
684            if (data instanceof XATransactionId[]) {
685                answer = (XATransactionId[])data;
686            } else {
687                answer = new XATransactionId[data.length];
688                System.arraycopy(data, 0, answer, 0, data.length);
689            }
690            LOG.debug("recover({})={}", flag, answer);
691            return answer;
692        } catch (JMSException e) {
693            throw toXAException(e);
694        }
695    }
696
697    @Override
698    public int getTransactionTimeout() throws XAException {
699        return 0;
700    }
701
702    @Override
703    public boolean setTransactionTimeout(int seconds) throws XAException {
704        return false;
705    }
706
707    // ///////////////////////////////////////////////////////////
708    //
709    // Helper methods.
710    //
711    // ///////////////////////////////////////////////////////////
712    protected String getResourceManagerId() throws JMSException {
713        return this.connection.getResourceManagerId();
714    }
715
716    private void setXid(Xid xid) throws XAException {
717
718        try {
719            this.connection.checkClosedOrFailed();
720            this.connection.ensureConnectionInfoSent();
721        } catch (JMSException e) {
722            disassociate();
723            throw toXAException(e);
724        }
725
726        if (xid != null) {
727            // associate
728            associatedXid = xid;
729            transactionId = new XATransactionId(xid);
730
731            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
732            try {
733                this.connection.asyncSendPacket(info);
734                LOG.debug("{} started XA transaction {}", this, transactionId);
735            } catch (JMSException e) {
736                disassociate();
737                throw toXAException(e);
738            }
739
740        } else {
741
742            if (transactionId != null) {
743                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END);
744                try {
745                    this.connection.syncSendPacket(info);
746                    LOG.debug("{} ended XA transaction {}", this, transactionId);
747                } catch (JMSException e) {
748                    disassociate();
749                    throw toXAException(e);
750                }
751
752                // Add our self to the list of contexts that are interested in
753                // post commit/rollback events.
754                List<TransactionContext> l;
755                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
756                    l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
757                    if (l == null) {
758                        l = new ArrayList<TransactionContext>(3);
759                        ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
760                    }
761                    if (!l.contains(this)) {
762                        l.add(this);
763                    }
764                }
765            }
766
767            disassociate();
768        }
769    }
770
771    private void disassociate() {
772         // dis-associate
773         associatedXid = null;
774         transactionId = null;
775    }
776
777    /**
778     * Converts a JMSException from the server to an XAException. if the
779     * JMSException contained a linked XAException that is returned instead.
780     *
781     * @param e JMSException to convert
782     * @return XAException wrapping original exception or its message
783     */
784    private XAException toXAException(JMSException e) {
785        if (e.getCause() != null && e.getCause() instanceof XAException) {
786            XAException original = (XAException)e.getCause();
787            XAException xae = new XAException(original.getMessage());
788            xae.errorCode = original.errorCode;
789            if (xae.errorCode == XA_OK) {
790                // detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable
791                xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR);
792            }
793            xae.initCause(original);
794            return xae;
795        }
796
797        XAException xae = new XAException(e.getMessage());
798        xae.errorCode = XAException.XAER_RMFAIL;
799        xae.initCause(e);
800        return xae;
801    }
802
803    private int parseFromMessageOr(String message, int fallbackCode) {
804        final String marker = "xaErrorCode:";
805        final int index = message.lastIndexOf(marker);
806        if (index > -1) {
807            try {
808                return Integer.parseInt(message.substring(index + marker.length()));
809            } catch (Exception ignored) {}
810        }
811        return fallbackCode;
812    }
813
814    public ActiveMQConnection getConnection() {
815        return connection;
816    }
817
818    // for RAR xa recovery where xaresource connection is per request
819    public ActiveMQConnection setConnection(ActiveMQConnection connection) {
820        ActiveMQConnection existing = this.connection;
821        this.connection = connection;
822        return existing;
823    }
824
825    public void cleanup() {
826        associatedXid = null;
827        transactionId = null;
828    }
829
830    @Override
831    public String toString() {
832        return "TransactionContext{" +
833                "transactionId=" + transactionId +
834                ",connection=" + connection +
835                '}';
836    }
837}