001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    
020    import java.util.ArrayList;
021    import java.util.Iterator;
022    import java.util.LinkedHashMap;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.concurrent.ConcurrentHashMap;
026    
027    import javax.jms.JMSException;
028    import javax.transaction.xa.XAException;
029    
030    import org.apache.activemq.ActiveMQMessageAudit;
031    import org.apache.activemq.broker.jmx.ManagedRegionBroker;
032    import org.apache.activemq.broker.region.Destination;
033    import org.apache.activemq.command.ActiveMQDestination;
034    import org.apache.activemq.command.BaseCommand;
035    import org.apache.activemq.command.ConnectionInfo;
036    import org.apache.activemq.command.LocalTransactionId;
037    import org.apache.activemq.command.Message;
038    import org.apache.activemq.command.MessageAck;
039    import org.apache.activemq.command.ProducerInfo;
040    import org.apache.activemq.command.TransactionId;
041    import org.apache.activemq.command.XATransactionId;
042    import org.apache.activemq.state.ProducerState;
043    import org.apache.activemq.store.TransactionRecoveryListener;
044    import org.apache.activemq.store.TransactionStore;
045    import org.apache.activemq.transaction.LocalTransaction;
046    import org.apache.activemq.transaction.Synchronization;
047    import org.apache.activemq.transaction.Transaction;
048    import org.apache.activemq.transaction.XATransaction;
049    import org.apache.activemq.util.IOExceptionSupport;
050    import org.apache.activemq.util.WrappedException;
051    import org.slf4j.Logger;
052    import org.slf4j.LoggerFactory;
053    
054    /**
055     * This broker filter handles the transaction related operations in the Broker
056     * interface.
057     * 
058     * 
059     */
060    public class TransactionBroker extends BrokerFilter {
061    
062        private static final Logger LOG = LoggerFactory.getLogger(TransactionBroker.class);
063    
064        // The prepared XA transactions.
065        private TransactionStore transactionStore;
066        private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>();
067        private ActiveMQMessageAudit audit;
068    
069        public TransactionBroker(Broker next, TransactionStore transactionStore) {
070            super(next);
071            this.transactionStore = transactionStore;
072        }
073    
074        // ////////////////////////////////////////////////////////////////////////////
075        //
076        // Life cycle Methods
077        //
078        // ////////////////////////////////////////////////////////////////////////////
079    
080        /**
081         * Recovers any prepared transactions.
082         */
083        public void start() throws Exception {
084            transactionStore.start();
085            try {
086                final ConnectionContext context = new ConnectionContext();
087                context.setBroker(this);
088                context.setInRecoveryMode(true);
089                context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
090                context.setProducerFlowControl(false);
091                final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
092                producerExchange.setMutable(true);
093                producerExchange.setConnectionContext(context);
094                producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
095                final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
096                consumerExchange.setConnectionContext(context);
097                transactionStore.recover(new TransactionRecoveryListener() {
098                    public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
099                        try {
100                            beginTransaction(context, xid);
101                            XATransaction transaction = (XATransaction) getTransaction(context, xid, false);
102                            for (int i = 0; i < addedMessages.length; i++) {
103                                forceDestinationWakeupOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
104                            }
105                            for (int i = 0; i < aks.length; i++) {
106                                forceDestinationWakeupOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
107                            }
108                            transaction.setState(Transaction.PREPARED_STATE);
109                            registerMBean(transaction);
110                            if (LOG.isDebugEnabled()) {
111                                LOG.debug("recovered prepared transaction: " + transaction.getTransactionId());
112                            }
113                        } catch (Throwable e) {
114                            throw new WrappedException(e);
115                        }
116                    }
117                });
118            } catch (WrappedException e) {
119                Throwable cause = e.getCause();
120                throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause);
121            }
122            next.start();
123        }
124    
125        private void registerMBean(XATransaction transaction) {
126            if (getBrokerService().getRegionBroker() instanceof ManagedRegionBroker ) {
127                ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) getBrokerService().getRegionBroker();
128                managedRegionBroker.registerRecoveredTransactionMBean(transaction);
129            }
130        }
131    
132        private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction,
133                                                        ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
134            Destination destination =  addDestination(context, amqDestination, false);
135            registerSync(destination, transaction, ack);
136        }
137    
138        private void registerSync(Destination destination, Transaction transaction, BaseCommand command) {
139            Synchronization sync = new PreparedDestinationCompletion(destination, command.isMessage());
140            // ensure one per destination in the list
141            Synchronization existing = transaction.findMatching(sync);
142            if (existing != null) {
143               ((PreparedDestinationCompletion)existing).incrementOpCount();
144            } else {
145                transaction.addSynchronization(sync);
146            }
147        }
148    
149        static class PreparedDestinationCompletion extends Synchronization {
150            final Destination destination;
151            final boolean messageSend;
152            int opCount = 1;
153            public PreparedDestinationCompletion(final Destination destination, boolean messageSend) {
154                this.destination = destination;
155                // rollback relevant to acks, commit to sends
156                this.messageSend = messageSend;
157            }
158    
159            public void incrementOpCount() {
160                opCount++;
161            }
162    
163            @Override
164            public int hashCode() {
165                return System.identityHashCode(destination) +
166                        System.identityHashCode(Boolean.valueOf(messageSend));
167            }
168    
169            @Override
170            public boolean equals(Object other) {
171                return other instanceof PreparedDestinationCompletion &&
172                        destination.equals(((PreparedDestinationCompletion) other).destination) &&
173                        messageSend == ((PreparedDestinationCompletion) other).messageSend;
174            }
175    
176            @Override
177            public void afterRollback() throws Exception {
178                if (!messageSend) {
179                    destination.clearPendingMessages();
180                    if (LOG.isDebugEnabled()) {
181                        LOG.debug("cleared pending from afterRollback : " + destination);
182                    }
183                }
184            }
185    
186            @Override
187            public void afterCommit() throws Exception {
188                if (messageSend) {
189                    destination.clearPendingMessages();
190                    destination.getDestinationStatistics().getEnqueues().add(opCount);
191                    destination.getDestinationStatistics().getMessages().add(opCount);
192                    if (LOG.isDebugEnabled()) {
193                        LOG.debug("cleared pending from afterCommit : " + destination);
194                    }
195                } else {
196                    destination.getDestinationStatistics().getDequeues().add(opCount);
197                    destination.getDestinationStatistics().getMessages().subtract(opCount);
198                }
199            }
200        }
201    
202        public void stop() throws Exception {
203            transactionStore.stop();
204            next.stop();
205        }
206    
207        // ////////////////////////////////////////////////////////////////////////////
208        //
209        // BrokerFilter overrides
210        //
211        // ////////////////////////////////////////////////////////////////////////////
212        public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
213            List<TransactionId> txs = new ArrayList<TransactionId>();
214            synchronized (xaTransactions) {
215                for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) {
216                    Transaction tx = iter.next();
217                    if (tx.isPrepared()) {
218                        if (LOG.isDebugEnabled()) {
219                            LOG.debug("prepared transaction: " + tx.getTransactionId());
220                        }
221                        txs.add(tx.getTransactionId());
222                    }
223                }
224            }
225            XATransactionId rc[] = new XATransactionId[txs.size()];
226            txs.toArray(rc);
227            if (LOG.isDebugEnabled()) {
228                LOG.debug("prepared transaction list size: " + rc.length);
229            }
230            return rc;
231        }
232    
233        public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
234            // the transaction may have already been started.
235            if (xid.isXATransaction()) {
236                XATransaction transaction = null;
237                synchronized (xaTransactions) {
238                    transaction = xaTransactions.get(xid);
239                    if (transaction != null) {
240                        return;
241                    }
242                    transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId());
243                    xaTransactions.put(xid, transaction);
244                }
245            } else {
246                Map<TransactionId, Transaction> transactionMap = context.getTransactions();
247                Transaction transaction = transactionMap.get(xid);
248                if (transaction != null) {
249                    throw new JMSException("Transaction '" + xid + "' has already been started.");
250                }
251                transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context);
252                transactionMap.put(xid, transaction);
253            }
254        }
255    
256        public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
257            Transaction transaction = getTransaction(context, xid, false);
258            return transaction.prepare();
259        }
260    
261        public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
262            Transaction transaction = getTransaction(context, xid, true);
263            transaction.commit(onePhase);
264        }
265    
266        public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
267            Transaction transaction = getTransaction(context, xid, true);
268            transaction.rollback();
269        }
270    
271        public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
272            Transaction transaction = getTransaction(context, xid, true);
273            transaction.rollback();
274        }
275    
276        public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
277            // This method may be invoked recursively.
278            // Track original tx so that it can be restored.
279            final ConnectionContext context = consumerExchange.getConnectionContext();
280            Transaction originalTx = context.getTransaction();
281            Transaction transaction = null;
282            if (ack.isInTransaction()) {
283                transaction = getTransaction(context, ack.getTransactionId(), false);
284            }
285            context.setTransaction(transaction);
286            try {
287                next.acknowledge(consumerExchange, ack);
288            } finally {
289                context.setTransaction(originalTx);
290            }
291        }
292    
293        public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception {
294            // This method may be invoked recursively.
295            // Track original tx so that it can be restored.
296            final ConnectionContext context = producerExchange.getConnectionContext();
297            Transaction originalTx = context.getTransaction();
298            Transaction transaction = null;
299            Synchronization sync = null;
300            if (message.getTransactionId() != null) {
301                transaction = getTransaction(context, message.getTransactionId(), false);
302                if (transaction != null) {
303                    sync = new Synchronization() {
304    
305                        public void afterRollback() {
306                            if (audit != null) {
307                                audit.rollback(message);
308                            }
309                        }
310                    };
311                    transaction.addSynchronization(sync);
312                }
313            }
314            if (audit == null || !audit.isDuplicate(message)) {
315                context.setTransaction(transaction);
316                try {
317                    next.send(producerExchange, message);
318                } finally {
319                    context.setTransaction(originalTx);
320                }
321            } else {
322                if (sync != null && transaction != null) {
323                    transaction.removeSynchronization(sync);
324                }
325                if (LOG.isDebugEnabled()) {
326                    LOG.debug("IGNORING duplicate message " + message);
327                }
328            }
329        }
330    
331        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
332            for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
333                try {
334                    Transaction transaction = iter.next();
335                    transaction.rollback();
336                } catch (Exception e) {
337                    LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
338                }
339                iter.remove();
340            }
341    
342            synchronized (xaTransactions) {
343                // first find all txs that belongs to the connection
344                ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
345                for (XATransaction tx : xaTransactions.values()) {
346                    if (tx.getConnectionId() != null && tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
347                        txs.add(tx);
348                    }
349                }
350    
351                // then remove them
352                // two steps needed to avoid ConcurrentModificationException, from removeTransaction()
353                for (XATransaction tx : txs) {
354                    try {
355                        tx.rollback();
356                    } catch (Exception e) {
357                        LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e);
358                    }
359                }
360    
361            }
362            next.removeConnection(context, info, error);
363        }
364    
365        // ////////////////////////////////////////////////////////////////////////////
366        //
367        // Implementation help methods.
368        //
369        // ////////////////////////////////////////////////////////////////////////////
370        public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
371            Map transactionMap = null;
372            synchronized (xaTransactions) {
373                transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions();
374            }
375            Transaction transaction = (Transaction)transactionMap.get(xid);
376            if (transaction != null) {
377                return transaction;
378            }
379            if (xid.isXATransaction()) {
380                XAException e = new XAException("Transaction '" + xid + "' has not been started.");
381                e.errorCode = XAException.XAER_NOTA;
382                throw e;
383            } else {
384                throw new JMSException("Transaction '" + xid + "' has not been started.");
385            }
386        }
387    
388        public void removeTransaction(XATransactionId xid) {
389            synchronized (xaTransactions) {
390                xaTransactions.remove(xid);
391            }
392        }
393    
394        public synchronized void brokerServiceStarted() {
395            super.brokerServiceStarted();
396            if (getBrokerService().isSupportFailOver() && audit == null) {
397                audit = new ActiveMQMessageAudit();
398            }
399        }
400    
401    }