001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.io.InterruptedIOException;
020    import java.util.ArrayList;
021    import java.util.Arrays;
022    import java.util.HashMap;
023    import java.util.List;
024    
025    import javax.jms.JMSException;
026    import javax.jms.TransactionInProgressException;
027    import javax.jms.TransactionRolledBackException;
028    import javax.transaction.xa.XAException;
029    import javax.transaction.xa.XAResource;
030    import javax.transaction.xa.Xid;
031    
032    import org.apache.activemq.command.Command;
033    import org.apache.activemq.command.ConnectionId;
034    import org.apache.activemq.command.DataArrayResponse;
035    import org.apache.activemq.command.DataStructure;
036    import org.apache.activemq.command.IntegerResponse;
037    import org.apache.activemq.command.LocalTransactionId;
038    import org.apache.activemq.command.Response;
039    import org.apache.activemq.command.TransactionId;
040    import org.apache.activemq.command.TransactionInfo;
041    import org.apache.activemq.command.XATransactionId;
042    import org.apache.activemq.transaction.Synchronization;
043    import org.apache.activemq.util.JMSExceptionSupport;
044    import org.apache.activemq.util.LongSequenceGenerator;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    /**
049     * A TransactionContext provides the means to control a JMS transaction. It
050     * provides a local transaction interface and also an XAResource interface. <p/>
051     * An application server controls the transactional assignment of an XASession
052     * by obtaining its XAResource. It uses the XAResource to assign the session to
053     * a transaction, prepare and commit work on the transaction, and so on. <p/> An
054     * XAResource provides some fairly sophisticated facilities for interleaving
055     * work on multiple transactions, recovering a list of transactions in progress,
056     * and so on. A JTA aware JMS provider must fully implement this functionality.
057     * This could be done by using the services of a database that supports XA, or a
058     * JMS provider may choose to implement this functionality from scratch. <p/>
059     *
060     *
061     * @see javax.jms.Session
062     * @see javax.jms.QueueSession
063     * @see javax.jms.TopicSession
064     * @see javax.jms.XASession
065     */
066    public class TransactionContext implements XAResource {
067    
068        private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
069    
070        // XATransactionId -> ArrayList of TransactionContext objects
071        private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS =
072                    new HashMap<TransactionId, List<TransactionContext>>();
073    
074        private final ActiveMQConnection connection;
075        private final LongSequenceGenerator localTransactionIdGenerator;
076        private final ConnectionId connectionId;
077        private List<Synchronization> synchronizations;
078    
079        // To track XA transactions.
080        private Xid associatedXid;
081        private TransactionId transactionId;
082        private LocalTransactionEventListener localTransactionEventListener;
083        private int beforeEndIndex;
084    
085        public TransactionContext(ActiveMQConnection connection) {
086            this.connection = connection;
087            this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
088            this.connectionId = connection.getConnectionInfo().getConnectionId();
089        }
090    
091        public boolean isInXATransaction() {
092            if (transactionId != null && transactionId.isXATransaction()) {
093                    return true;
094            } else {
095                    if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) {
096                            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
097                                    for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) {
098                                            if (transactions.contains(this)) {
099                                                    return true;
100                                            }
101                                    }
102                            }
103                    }
104            }
105    
106            return false;
107        }
108    
109        public boolean isInLocalTransaction() {
110            return transactionId != null && transactionId.isLocalTransaction();
111        }
112    
113        public boolean isInTransaction() {
114            return transactionId != null;
115        }
116    
117        /**
118         * @return Returns the localTransactionEventListener.
119         */
120        public LocalTransactionEventListener getLocalTransactionEventListener() {
121            return localTransactionEventListener;
122        }
123    
124        /**
125         * Used by the resource adapter to listen to transaction events.
126         *
127         * @param localTransactionEventListener The localTransactionEventListener to
128         *                set.
129         */
130        public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
131            this.localTransactionEventListener = localTransactionEventListener;
132        }
133    
134        // ///////////////////////////////////////////////////////////
135        //
136        // Methods that work with the Synchronization objects registered with
137        // the transaction.
138        //
139        // ///////////////////////////////////////////////////////////
140    
141        public void addSynchronization(Synchronization s) {
142            if (synchronizations == null) {
143                synchronizations = new ArrayList<Synchronization>(10);
144            }
145            synchronizations.add(s);
146        }
147    
148        private void afterRollback() throws JMSException {
149            if (synchronizations == null) {
150                return;
151            }
152    
153            Throwable firstException = null;
154            int size = synchronizations.size();
155            for (int i = 0; i < size; i++) {
156                try {
157                    synchronizations.get(i).afterRollback();
158                } catch (Throwable t) {
159                    LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t);
160                    if (firstException == null) {
161                        firstException = t;
162                    }
163                }
164            }
165            synchronizations = null;
166            if (firstException != null) {
167                throw JMSExceptionSupport.create(firstException);
168            }
169        }
170    
171        private void afterCommit() throws JMSException {
172            if (synchronizations == null) {
173                return;
174            }
175    
176            Throwable firstException = null;
177            int size = synchronizations.size();
178            for (int i = 0; i < size; i++) {
179                try {
180                    synchronizations.get(i).afterCommit();
181                } catch (Throwable t) {
182                    LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t);
183                    if (firstException == null) {
184                        firstException = t;
185                    }
186                }
187            }
188            synchronizations = null;
189            if (firstException != null) {
190                throw JMSExceptionSupport.create(firstException);
191            }
192        }
193    
194        private void beforeEnd() throws JMSException {
195            if (synchronizations == null) {
196                return;
197            }
198    
199            int size = synchronizations.size();
200            try {
201                for (;beforeEndIndex < size;) {
202                    synchronizations.get(beforeEndIndex++).beforeEnd();
203                }
204            } catch (JMSException e) {
205                throw e;
206            } catch (Throwable e) {
207                throw JMSExceptionSupport.create(e);
208            }
209        }
210    
211        public TransactionId getTransactionId() {
212            return transactionId;
213        }
214    
215        // ///////////////////////////////////////////////////////////
216        //
217        // Local transaction interface.
218        //
219        // ///////////////////////////////////////////////////////////
220    
221        /**
222         * Start a local transaction.
223         * @throws javax.jms.JMSException on internal error
224         */
225        public void begin() throws JMSException {
226    
227            if (isInXATransaction()) {
228                throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
229            }
230    
231            if (transactionId == null) {
232                synchronizations = null;
233                beforeEndIndex = 0;
234                this.transactionId = new LocalTransactionId(connectionId, localTransactionIdGenerator.getNextSequenceId());
235                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
236                this.connection.ensureConnectionInfoSent();
237                this.connection.asyncSendPacket(info);
238    
239                // Notify the listener that the tx was started.
240                if (localTransactionEventListener != null) {
241                    localTransactionEventListener.beginEvent();
242                }
243                if (LOG.isDebugEnabled()) {
244                    LOG.debug("Begin:" + transactionId);
245                }
246            }
247    
248        }
249    
250        /**
251         * Rolls back any work done in this transaction and releases any locks
252         * currently held.
253         *
254         * @throws JMSException if the JMS provider fails to roll back the
255         *                 transaction due to some internal error.
256         * @throws javax.jms.IllegalStateException if the method is not called by a
257         *                 transacted session.
258         */
259        public void rollback() throws JMSException {
260            if (isInXATransaction()) {
261                throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
262            }
263    
264            try {
265                beforeEnd();
266            } catch (TransactionRolledBackException canOcurrOnFailover) {
267                LOG.warn("rollback processing error", canOcurrOnFailover);
268            }
269            if (transactionId != null) {
270                if (LOG.isDebugEnabled()) {
271                    LOG.debug("Rollback: "  + transactionId
272                    + " syncCount: "
273                    + (synchronizations != null ? synchronizations.size() : 0));
274                }
275    
276                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
277                this.transactionId = null;
278                //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
279                this.connection.syncSendPacket(info);
280                // Notify the listener that the tx was rolled back
281                if (localTransactionEventListener != null) {
282                    localTransactionEventListener.rollbackEvent();
283                }
284            }
285    
286            afterRollback();
287        }
288    
289        /**
290         * Commits all work done in this transaction and releases any locks
291         * currently held.
292         *
293         * @throws JMSException if the JMS provider fails to commit the transaction
294         *                 due to some internal error.
295         * @throws javax.jms.IllegalStateException if the method is not called by a
296         *                 transacted session.
297         */
298        public void commit() throws JMSException {
299            if (isInXATransaction()) {
300                throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
301            }
302    
303            try {
304                beforeEnd();
305            } catch (JMSException e) {
306                rollback();
307                throw e;
308            }
309    
310            // Only send commit if the transaction was started.
311            if (transactionId != null) {
312                if (LOG.isDebugEnabled()) {
313                    LOG.debug("Commit: "  + transactionId
314                            + " syncCount: "
315                            + (synchronizations != null ? synchronizations.size() : 0));
316                }
317    
318                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
319                this.transactionId = null;
320                // Notify the listener that the tx was committed back
321                try {
322                    syncSendPacketWithInterruptionHandling(info);
323                    if (localTransactionEventListener != null) {
324                        localTransactionEventListener.commitEvent();
325                    }
326                    afterCommit();
327                } catch (JMSException cause) {
328                    LOG.info("commit failed for transaction " + info.getTransactionId(), cause);
329                    if (localTransactionEventListener != null) {
330                        localTransactionEventListener.rollbackEvent();
331                    }
332                    afterRollback();
333                    throw cause;
334                }
335    
336            }
337        }
338    
339        // ///////////////////////////////////////////////////////////
340        //
341        // XAResource Implementation
342        //
343        // ///////////////////////////////////////////////////////////
344        /**
345         * Associates a transaction with the resource.
346         */
347        public void start(Xid xid, int flags) throws XAException {
348    
349            if (LOG.isDebugEnabled()) {
350                LOG.debug("Start: " + xid);
351            }
352            if (isInLocalTransaction()) {
353                throw new XAException(XAException.XAER_PROTO);
354            }
355            // Are we already associated?
356            if (associatedXid != null) {
357                throw new XAException(XAException.XAER_PROTO);
358            }
359    
360            // if ((flags & TMJOIN) == TMJOIN) {
361            // TODO: verify that the server has seen the xid
362            // // }
363            // if ((flags & TMJOIN) == TMRESUME) {
364            // // TODO: verify that the xid was suspended.
365            // }
366    
367            // associate
368            synchronizations = null;
369            beforeEndIndex = 0;
370            setXid(xid);
371        }
372    
373        /**
374         * @return connectionId for connection
375         */
376        private ConnectionId getConnectionId() {
377            return connection.getConnectionInfo().getConnectionId();
378        }
379    
380        public void end(Xid xid, int flags) throws XAException {
381    
382            if (LOG.isDebugEnabled()) {
383                LOG.debug("End: " + xid);
384            }
385    
386            if (isInLocalTransaction()) {
387                throw new XAException(XAException.XAER_PROTO);
388            }
389    
390            if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
391                // You can only suspend the associated xid.
392                if (!equals(associatedXid, xid)) {
393                    throw new XAException(XAException.XAER_PROTO);
394                }
395    
396                // TODO: we may want to put the xid in a suspended list.
397                try {
398                    beforeEnd();
399                } catch (JMSException e) {
400                    throw toXAException(e);
401                }
402                setXid(null);
403            } else if ((flags & TMSUCCESS) == TMSUCCESS) {
404                // set to null if this is the current xid.
405                // otherwise this could be an asynchronous success call
406                if (equals(associatedXid, xid)) {
407                    try {
408                        beforeEnd();
409                    } catch (JMSException e) {
410                        throw toXAException(e);
411                    }
412                    setXid(null);
413                }
414            } else {
415                throw new XAException(XAException.XAER_INVAL);
416            }
417        }
418    
419        private boolean equals(Xid xid1, Xid xid2) {
420            if (xid1 == xid2) {
421                return true;
422            }
423            if (xid1 == null ^ xid2 == null) {
424                return false;
425            }
426            return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
427                   && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
428        }
429    
430        public int prepare(Xid xid) throws XAException {
431            if (LOG.isDebugEnabled()) {
432                LOG.debug("Prepare: " + xid);
433            }
434    
435            // We allow interleaving multiple transactions, so
436            // we don't limit prepare to the associated xid.
437            XATransactionId x;
438            // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
439            // called first
440            if (xid == null || (equals(associatedXid, xid))) {
441                throw new XAException(XAException.XAER_PROTO);
442            } else {
443                // TODO: cache the known xids so we don't keep recreating this one??
444                x = new XATransactionId(xid);
445            }
446    
447            try {
448                TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
449    
450                // Find out if the server wants to commit or rollback.
451                IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
452                if (XAResource.XA_RDONLY == response.getResult()) {
453                    // transaction stops now, may be syncs that need a callback
454                            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
455                            List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
456                            if (l != null && !l.isEmpty()) {
457                                if (LOG.isDebugEnabled()) {
458                                    LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid);
459                                }
460                                for (TransactionContext ctx : l) {
461                                    ctx.afterCommit();
462                                }
463                            }
464                            }
465                }
466                return response.getResult();
467    
468            } catch (JMSException e) {
469                LOG.warn("prepare of: " + x + " failed with: " + e, e);
470                    synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
471                        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
472                        if (l != null && !l.isEmpty()) {
473                            for (TransactionContext ctx : l) {
474                                try {
475                                    ctx.afterRollback();
476                                } catch (Throwable ignored) {
477                                    if (LOG.isDebugEnabled()) {
478                                        LOG.debug("failed to firing afterRollback callbacks on prepare failure, txid: " +
479                                                      x + ", context: " + ctx, ignored);
480                                    }
481                                }
482                            }
483                        }
484                    }
485                throw toXAException(e);
486            }
487        }
488    
489        public void rollback(Xid xid) throws XAException {
490    
491            if (LOG.isDebugEnabled()) {
492                LOG.debug("Rollback: " + xid);
493            }
494    
495            // We allow interleaving multiple transactions, so
496            // we don't limit rollback to the associated xid.
497            XATransactionId x;
498            if (xid == null) {
499                throw new XAException(XAException.XAER_PROTO);
500            }
501            if (equals(associatedXid, xid)) {
502                // I think this can happen even without an end(xid) call. Need to
503                // check spec.
504                x = (XATransactionId)transactionId;
505            } else {
506                x = new XATransactionId(xid);
507            }
508    
509            try {
510                this.connection.checkClosedOrFailed();
511                this.connection.ensureConnectionInfoSent();
512    
513                // Let the server know that the tx is rollback.
514                TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
515                syncSendPacketWithInterruptionHandling(info);
516    
517                    synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
518                        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
519                        if (l != null && !l.isEmpty()) {
520                            for (TransactionContext ctx : l) {
521                                ctx.afterRollback();
522                            }
523                        }
524                    }
525            } catch (JMSException e) {
526                throw toXAException(e);
527            }
528        }
529    
530        // XAResource interface
531        public void commit(Xid xid, boolean onePhase) throws XAException {
532    
533            if (LOG.isDebugEnabled()) {
534                LOG.debug("Commit: " + xid + ", onePhase=" + onePhase);
535            }
536    
537            // We allow interleaving multiple transactions, so
538            // we don't limit commit to the associated xid.
539            XATransactionId x;
540            if (xid == null || (equals(associatedXid, xid))) {
541                // should never happen, end(xid,TMSUCCESS) must have been previously
542                // called
543                throw new XAException(XAException.XAER_PROTO);
544            } else {
545                x = new XATransactionId(xid);
546            }
547    
548            try {
549                this.connection.checkClosedOrFailed();
550                this.connection.ensureConnectionInfoSent();
551    
552                // Notify the server that the tx was committed back
553                TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
554    
555                syncSendPacketWithInterruptionHandling(info);
556    
557                    synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
558                        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
559                        if (l != null && !l.isEmpty()) {
560                            for (TransactionContext ctx : l) {
561                                try {
562                                    ctx.afterCommit();
563                                } catch (Exception ignored) {
564                                    LOG.debug("ignoring exception from after completion on ended transaction: " + ignored, ignored);
565                                }
566                            }
567                        }
568                    }
569    
570            } catch (JMSException e) {
571                LOG.warn("commit of: " + x + " failed with: " + e, e);
572                if (onePhase) {
573                            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
574                            List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
575                            if (l != null && !l.isEmpty()) {
576                                for (TransactionContext ctx : l) {
577                                    try {
578                                        ctx.afterRollback();
579                                    } catch (Throwable ignored) {
580                                        if (LOG.isDebugEnabled()) {
581                                            LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored);
582                                        }
583                                    }
584                                }
585                            }
586                            }
587                }
588                throw toXAException(e);
589            }
590    
591        }
592    
593        public void forget(Xid xid) throws XAException {
594            if (LOG.isDebugEnabled()) {
595                LOG.debug("Forget: " + xid);
596            }
597    
598            // We allow interleaving multiple transactions, so
599            // we don't limit forget to the associated xid.
600            XATransactionId x;
601            if (xid == null) {
602                throw new XAException(XAException.XAER_PROTO);
603            }
604            if (equals(associatedXid, xid)) {
605                // TODO determine if this can happen... I think not.
606                x = (XATransactionId)transactionId;
607            } else {
608                x = new XATransactionId(xid);
609            }
610    
611            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
612    
613            try {
614                // Tell the server to forget the transaction.
615                syncSendPacketWithInterruptionHandling(info);
616            } catch (JMSException e) {
617                throw toXAException(e);
618            }
619            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
620                    ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
621            }
622        }
623    
624        public boolean isSameRM(XAResource xaResource) throws XAException {
625            if (xaResource == null) {
626                return false;
627            }
628            if (!(xaResource instanceof TransactionContext)) {
629                return false;
630            }
631            TransactionContext xar = (TransactionContext)xaResource;
632            try {
633                return getResourceManagerId().equals(xar.getResourceManagerId());
634            } catch (Throwable e) {
635                throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
636            }
637        }
638    
639        public Xid[] recover(int flag) throws XAException {
640            if (LOG.isDebugEnabled()) {
641                LOG.debug("Recover: " + flag);
642            }
643    
644            TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
645            try {
646                this.connection.checkClosedOrFailed();
647                this.connection.ensureConnectionInfoSent();
648    
649                DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
650                DataStructure[] data = receipt.getData();
651                XATransactionId[] answer;
652                if (data instanceof XATransactionId[]) {
653                    answer = (XATransactionId[])data;
654                } else {
655                    answer = new XATransactionId[data.length];
656                    System.arraycopy(data, 0, answer, 0, data.length);
657                }
658                return answer;
659            } catch (JMSException e) {
660                throw toXAException(e);
661            }
662        }
663    
664        public int getTransactionTimeout() throws XAException {
665            return 0;
666        }
667    
668        public boolean setTransactionTimeout(int seconds) throws XAException {
669            return false;
670        }
671    
672        // ///////////////////////////////////////////////////////////
673        //
674        // Helper methods.
675        //
676        // ///////////////////////////////////////////////////////////
677        private String getResourceManagerId() throws JMSException {
678            return this.connection.getResourceManagerId();
679        }
680    
681        private void setXid(Xid xid) throws XAException {
682    
683            try {
684                this.connection.checkClosedOrFailed();
685                this.connection.ensureConnectionInfoSent();
686            } catch (JMSException e) {
687                throw toXAException(e);
688            }
689    
690            if (xid != null) {
691                // associate
692                associatedXid = xid;
693                transactionId = new XATransactionId(xid);
694    
695                TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.BEGIN);
696                try {
697                    this.connection.asyncSendPacket(info);
698                    if (LOG.isDebugEnabled()) {
699                        LOG.debug("Started XA transaction: " + transactionId);
700                    }
701                } catch (JMSException e) {
702                    throw toXAException(e);
703                }
704    
705            } else {
706    
707                if (transactionId != null) {
708                    TransactionInfo info = new TransactionInfo(connectionId, transactionId, TransactionInfo.END);
709                    try {
710                        syncSendPacketWithInterruptionHandling(info);
711                        if (LOG.isDebugEnabled()) {
712                            LOG.debug("Ended XA transaction: " + transactionId);
713                        }
714                    } catch (JMSException e) {
715                        throw toXAException(e);
716                    }
717    
718                    // Add our self to the list of contexts that are interested in
719                    // post commit/rollback events.
720                            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
721                            List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
722                            if (l == null) {
723                                l = new ArrayList<TransactionContext>(3);
724                                ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
725                                l.add(this);
726                            } else if (!l.contains(this)) {
727                                l.add(this);
728                            }
729                            }
730                }
731    
732                // dis-associate
733                associatedXid = null;
734                transactionId = null;
735            }
736        }
737    
738        /**
739         * Sends the given command. Also sends the command in case of interruption,
740         * so that important commands like rollback and commit are never interrupted.
741         * If interruption occurred, set the interruption state of the current
742         * after performing the action again.
743         *
744         * @return the response
745         */
746        private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException {
747            try {
748                return this.connection.syncSendPacket(command);
749            } catch (JMSException e) {
750                if (e.getLinkedException() instanceof InterruptedIOException) {
751                    try {
752                        Thread.interrupted();
753                        return this.connection.syncSendPacket(command);
754                    } finally {
755                        Thread.currentThread().interrupt();
756                    }
757                }
758    
759                throw e;
760            }
761        }
762    
763        /**
764         * Converts a JMSException from the server to an XAException. if the
765         * JMSException contained a linked XAException that is returned instead.
766         *
767         * @param e JMSException to convert
768         * @return XAException wrapping original exception or its message
769         */
770        private XAException toXAException(JMSException e) {
771            if (e.getCause() != null && e.getCause() instanceof XAException) {
772                XAException original = (XAException)e.getCause();
773                XAException xae = new XAException(original.getMessage());
774                xae.errorCode = original.errorCode;
775                xae.initCause(original);
776                return xae;
777            }
778    
779            XAException xae = new XAException(e.getMessage());
780            xae.errorCode = XAException.XAER_RMFAIL;
781            xae.initCause(e);
782            return xae;
783        }
784    
785        public ActiveMQConnection getConnection() {
786            return connection;
787        }
788    
789        public void cleanup() {
790            associatedXid = null;
791            transactionId = null;
792        }
793    
794        @Override
795        public String toString() {
796            return "TransactionContext{" +
797                    "transactionId=" + transactionId +
798                    '}';
799        }
800    }