001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019
020import java.util.ArrayList;
021import java.util.Iterator;
022import java.util.LinkedHashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026
027import javax.jms.JMSException;
028import javax.transaction.xa.XAException;
029
030import org.apache.activemq.ActiveMQMessageAudit;
031import org.apache.activemq.broker.jmx.ManagedRegionBroker;
032import org.apache.activemq.broker.region.Destination;
033import org.apache.activemq.command.ActiveMQDestination;
034import org.apache.activemq.command.BaseCommand;
035import org.apache.activemq.command.ConnectionInfo;
036import org.apache.activemq.command.LocalTransactionId;
037import org.apache.activemq.command.Message;
038import org.apache.activemq.command.MessageAck;
039import org.apache.activemq.command.ProducerInfo;
040import org.apache.activemq.command.TransactionId;
041import org.apache.activemq.command.XATransactionId;
042import org.apache.activemq.state.ProducerState;
043import org.apache.activemq.store.TransactionRecoveryListener;
044import org.apache.activemq.store.TransactionStore;
045import org.apache.activemq.transaction.LocalTransaction;
046import org.apache.activemq.transaction.Synchronization;
047import org.apache.activemq.transaction.Transaction;
048import org.apache.activemq.transaction.XATransaction;
049import org.apache.activemq.util.IOExceptionSupport;
050import org.apache.activemq.util.WrappedException;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * This broker filter handles the transaction related operations in the Broker
056 * interface.
057 * 
058 * 
059 */
060public class TransactionBroker extends BrokerFilter {
061
062    private static final Logger LOG = LoggerFactory.getLogger(TransactionBroker.class);
063
064    // The prepared XA transactions.
065    private TransactionStore transactionStore;
066    private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>();
067
068    public TransactionBroker(Broker next, TransactionStore transactionStore) {
069        super(next);
070        this.transactionStore = transactionStore;
071    }
072
073    // ////////////////////////////////////////////////////////////////////////////
074    //
075    // Life cycle Methods
076    //
077    // ////////////////////////////////////////////////////////////////////////////
078
079    /**
080     * Recovers any prepared transactions.
081     */
082    public void start() throws Exception {
083        transactionStore.start();
084        try {
085            final ConnectionContext context = new ConnectionContext();
086            context.setBroker(this);
087            context.setInRecoveryMode(true);
088            context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
089            context.setProducerFlowControl(false);
090            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
091            producerExchange.setMutable(true);
092            producerExchange.setConnectionContext(context);
093            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
094            final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
095            consumerExchange.setConnectionContext(context);
096            transactionStore.recover(new TransactionRecoveryListener() {
097                public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
098                    try {
099                        beginTransaction(context, xid);
100                        XATransaction transaction = (XATransaction) getTransaction(context, xid, false);
101                        for (int i = 0; i < addedMessages.length; i++) {
102                            forceDestinationWakeupOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
103                        }
104                        for (int i = 0; i < aks.length; i++) {
105                            forceDestinationWakeupOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
106                        }
107                        transaction.setState(Transaction.PREPARED_STATE);
108                        registerMBean(transaction);
109                        LOG.debug("recovered prepared transaction: {}", transaction.getTransactionId());
110                    } catch (Throwable e) {
111                        throw new WrappedException(e);
112                    }
113                }
114            });
115        } catch (WrappedException e) {
116            Throwable cause = e.getCause();
117            throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause);
118        }
119        next.start();
120    }
121
122    private void registerMBean(XATransaction transaction) {
123        if (getBrokerService().getRegionBroker() instanceof ManagedRegionBroker ) {
124            ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) getBrokerService().getRegionBroker();
125            managedRegionBroker.registerRecoveredTransactionMBean(transaction);
126        }
127    }
128
129    private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction,
130                                                    ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
131        Destination destination =  addDestination(context, amqDestination, false);
132        registerSync(destination, transaction, ack);
133    }
134
135    private void registerSync(Destination destination, Transaction transaction, BaseCommand command) {
136        Synchronization sync = new PreparedDestinationCompletion(destination, command.isMessage());
137        // ensure one per destination in the list
138        Synchronization existing = transaction.findMatching(sync);
139        if (existing != null) {
140           ((PreparedDestinationCompletion)existing).incrementOpCount();
141        } else {
142            transaction.addSynchronization(sync);
143        }
144    }
145
146    static class PreparedDestinationCompletion extends Synchronization {
147        final Destination destination;
148        final boolean messageSend;
149        int opCount = 1;
150        public PreparedDestinationCompletion(final Destination destination, boolean messageSend) {
151            this.destination = destination;
152            // rollback relevant to acks, commit to sends
153            this.messageSend = messageSend;
154        }
155
156        public void incrementOpCount() {
157            opCount++;
158        }
159
160        @Override
161        public int hashCode() {
162            return System.identityHashCode(destination) +
163                    System.identityHashCode(Boolean.valueOf(messageSend));
164        }
165
166        @Override
167        public boolean equals(Object other) {
168            return other instanceof PreparedDestinationCompletion &&
169                    destination.equals(((PreparedDestinationCompletion) other).destination) &&
170                    messageSend == ((PreparedDestinationCompletion) other).messageSend;
171        }
172
173        @Override
174        public void afterRollback() throws Exception {
175            if (!messageSend) {
176                destination.clearPendingMessages();
177                LOG.debug("cleared pending from afterRollback: {}", destination);
178            }
179        }
180
181        @Override
182        public void afterCommit() throws Exception {
183            if (messageSend) {
184                destination.clearPendingMessages();
185                destination.getDestinationStatistics().getEnqueues().add(opCount);
186                destination.getDestinationStatistics().getMessages().add(opCount);
187                LOG.debug("cleared pending from afterCommit: {}", destination);
188            } else {
189                destination.getDestinationStatistics().getDequeues().add(opCount);
190                destination.getDestinationStatistics().getMessages().subtract(opCount);
191            }
192        }
193    }
194
195    public void stop() throws Exception {
196        transactionStore.stop();
197        next.stop();
198    }
199
200    // ////////////////////////////////////////////////////////////////////////////
201    //
202    // BrokerFilter overrides
203    //
204    // ////////////////////////////////////////////////////////////////////////////
205    public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
206        List<TransactionId> txs = new ArrayList<TransactionId>();
207        synchronized (xaTransactions) {
208            for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) {
209                Transaction tx = iter.next();
210                if (tx.isPrepared()) {
211                    LOG.debug("prepared transaction: {}", tx.getTransactionId());
212                    txs.add(tx.getTransactionId());
213                }
214            }
215        }
216        XATransactionId rc[] = new XATransactionId[txs.size()];
217        txs.toArray(rc);
218        LOG.debug("prepared transaction list size: {}", rc.length);
219        return rc;
220    }
221
222    public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
223        // the transaction may have already been started.
224        if (xid.isXATransaction()) {
225            XATransaction transaction = null;
226            synchronized (xaTransactions) {
227                transaction = xaTransactions.get(xid);
228                if (transaction != null) {
229                    return;
230                }
231                transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId());
232                xaTransactions.put(xid, transaction);
233            }
234        } else {
235            Map<TransactionId, Transaction> transactionMap = context.getTransactions();
236            Transaction transaction = transactionMap.get(xid);
237            if (transaction != null) {
238                throw new JMSException("Transaction '" + xid + "' has already been started.");
239            }
240            transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context);
241            transactionMap.put(xid, transaction);
242        }
243    }
244
245    public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
246        Transaction transaction = getTransaction(context, xid, false);
247        return transaction.prepare();
248    }
249
250    public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
251        Transaction transaction = getTransaction(context, xid, true);
252        transaction.commit(onePhase);
253    }
254
255    public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
256        Transaction transaction = getTransaction(context, xid, true);
257        transaction.rollback();
258    }
259
260    public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
261        Transaction transaction = getTransaction(context, xid, true);
262        transaction.rollback();
263    }
264
265    public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
266        // This method may be invoked recursively.
267        // Track original tx so that it can be restored.
268        final ConnectionContext context = consumerExchange.getConnectionContext();
269        Transaction originalTx = context.getTransaction();
270        Transaction transaction = null;
271        if (ack.isInTransaction()) {
272            transaction = getTransaction(context, ack.getTransactionId(), false);
273        }
274        context.setTransaction(transaction);
275        try {
276            next.acknowledge(consumerExchange, ack);
277        } finally {
278            context.setTransaction(originalTx);
279        }
280    }
281
282    public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception {
283        // This method may be invoked recursively.
284        // Track original tx so that it can be restored.
285        final ConnectionContext context = producerExchange.getConnectionContext();
286        Transaction originalTx = context.getTransaction();
287        Transaction transaction = null;
288        if (message.getTransactionId() != null) {
289            transaction = getTransaction(context, message.getTransactionId(), false);
290        }
291        context.setTransaction(transaction);
292        try {
293            next.send(producerExchange, message);
294        } finally {
295            context.setTransaction(originalTx);
296        }
297    }
298
299    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
300        for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
301            try {
302                Transaction transaction = iter.next();
303                transaction.rollback();
304            } catch (Exception e) {
305                LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
306            }
307            iter.remove();
308        }
309
310        synchronized (xaTransactions) {
311            // first find all txs that belongs to the connection
312            ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
313            for (XATransaction tx : xaTransactions.values()) {
314                if (tx.getConnectionId() != null && tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
315                    txs.add(tx);
316                }
317            }
318
319            // then remove them
320            // two steps needed to avoid ConcurrentModificationException, from removeTransaction()
321            for (XATransaction tx : txs) {
322                try {
323                    tx.rollback();
324                } catch (Exception e) {
325                    LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e);
326                }
327            }
328
329        }
330        next.removeConnection(context, info, error);
331    }
332
333    // ////////////////////////////////////////////////////////////////////////////
334    //
335    // Implementation help methods.
336    //
337    // ////////////////////////////////////////////////////////////////////////////
338    public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
339        Transaction transaction = null;
340        if (xid.isXATransaction()) {
341            synchronized (xaTransactions) {
342                transaction = xaTransactions.get(xid);
343            }
344        } else {
345            transaction = context.getTransactions().get(xid);
346        }
347        if (transaction != null) {
348            return transaction;
349        }
350        if (xid.isXATransaction()) {
351            XAException e = XATransaction.newXAException("Transaction '" + xid + "' has not been started.", XAException.XAER_NOTA);
352            throw e;
353        } else {
354            throw new JMSException("Transaction '" + xid + "' has not been started.");
355        }
356    }
357
358    public void removeTransaction(XATransactionId xid) {
359        synchronized (xaTransactions) {
360            xaTransactions.remove(xid);
361        }
362    }
363
364}