001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019
020 import java.util.ArrayList;
021 import java.util.Iterator;
022 import java.util.LinkedHashMap;
023 import java.util.List;
024 import java.util.Map;
025 import java.util.concurrent.ConcurrentHashMap;
026
027 import javax.jms.JMSException;
028 import javax.transaction.xa.XAException;
029
030 import org.apache.activemq.ActiveMQMessageAudit;
031 import org.apache.activemq.broker.jmx.ManagedRegionBroker;
032 import org.apache.activemq.broker.region.Destination;
033 import org.apache.activemq.command.ActiveMQDestination;
034 import org.apache.activemq.command.BaseCommand;
035 import org.apache.activemq.command.ConnectionInfo;
036 import org.apache.activemq.command.LocalTransactionId;
037 import org.apache.activemq.command.Message;
038 import org.apache.activemq.command.MessageAck;
039 import org.apache.activemq.command.ProducerInfo;
040 import org.apache.activemq.command.TransactionId;
041 import org.apache.activemq.command.XATransactionId;
042 import org.apache.activemq.state.ProducerState;
043 import org.apache.activemq.store.TransactionRecoveryListener;
044 import org.apache.activemq.store.TransactionStore;
045 import org.apache.activemq.transaction.LocalTransaction;
046 import org.apache.activemq.transaction.Synchronization;
047 import org.apache.activemq.transaction.Transaction;
048 import org.apache.activemq.transaction.XATransaction;
049 import org.apache.activemq.util.IOExceptionSupport;
050 import org.apache.activemq.util.WrappedException;
051 import org.slf4j.Logger;
052 import org.slf4j.LoggerFactory;
053
054 /**
055 * This broker filter handles the transaction related operations in the Broker
056 * interface.
057 *
058 *
059 */
060 public class TransactionBroker extends BrokerFilter {
061
062 private static final Logger LOG = LoggerFactory.getLogger(TransactionBroker.class);
063
064 // The prepared XA transactions.
065 private TransactionStore transactionStore;
066 private Map<TransactionId, XATransaction> xaTransactions = new LinkedHashMap<TransactionId, XATransaction>();
067 private ActiveMQMessageAudit audit;
068
069 public TransactionBroker(Broker next, TransactionStore transactionStore) {
070 super(next);
071 this.transactionStore = transactionStore;
072 }
073
074 // ////////////////////////////////////////////////////////////////////////////
075 //
076 // Life cycle Methods
077 //
078 // ////////////////////////////////////////////////////////////////////////////
079
080 /**
081 * Recovers any prepared transactions.
082 */
083 public void start() throws Exception {
084 transactionStore.start();
085 try {
086 final ConnectionContext context = new ConnectionContext();
087 context.setBroker(this);
088 context.setInRecoveryMode(true);
089 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
090 context.setProducerFlowControl(false);
091 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
092 producerExchange.setMutable(true);
093 producerExchange.setConnectionContext(context);
094 producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
095 final ConsumerBrokerExchange consumerExchange = new ConsumerBrokerExchange();
096 consumerExchange.setConnectionContext(context);
097 transactionStore.recover(new TransactionRecoveryListener() {
098 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
099 try {
100 beginTransaction(context, xid);
101 XATransaction transaction = (XATransaction) getTransaction(context, xid, false);
102 for (int i = 0; i < addedMessages.length; i++) {
103 forceDestinationWakeupOnCompletion(context, transaction, addedMessages[i].getDestination(), addedMessages[i]);
104 }
105 for (int i = 0; i < aks.length; i++) {
106 forceDestinationWakeupOnCompletion(context, transaction, aks[i].getDestination(), aks[i]);
107 }
108 transaction.setState(Transaction.PREPARED_STATE);
109 registerMBean(transaction);
110 if (LOG.isDebugEnabled()) {
111 LOG.debug("recovered prepared transaction: " + transaction.getTransactionId());
112 }
113 } catch (Throwable e) {
114 throw new WrappedException(e);
115 }
116 }
117 });
118 } catch (WrappedException e) {
119 Throwable cause = e.getCause();
120 throw IOExceptionSupport.create("Recovery Failed: " + cause.getMessage(), cause);
121 }
122 next.start();
123 }
124
125 private void registerMBean(XATransaction transaction) {
126 if (getBrokerService().getRegionBroker() instanceof ManagedRegionBroker ) {
127 ManagedRegionBroker managedRegionBroker = (ManagedRegionBroker) getBrokerService().getRegionBroker();
128 managedRegionBroker.registerRecoveredTransactionMBean(transaction);
129 }
130 }
131
132 private void forceDestinationWakeupOnCompletion(ConnectionContext context, Transaction transaction,
133 ActiveMQDestination amqDestination, BaseCommand ack) throws Exception {
134 Destination destination = addDestination(context, amqDestination, false);
135 registerSync(destination, transaction, ack);
136 }
137
138 private void registerSync(Destination destination, Transaction transaction, BaseCommand command) {
139 Synchronization sync = new PreparedDestinationCompletion(destination, command.isMessage());
140 // ensure one per destination in the list
141 Synchronization existing = transaction.findMatching(sync);
142 if (existing != null) {
143 ((PreparedDestinationCompletion)existing).incrementOpCount();
144 } else {
145 transaction.addSynchronization(sync);
146 }
147 }
148
149 static class PreparedDestinationCompletion extends Synchronization {
150 final Destination destination;
151 final boolean messageSend;
152 int opCount = 1;
153 public PreparedDestinationCompletion(final Destination destination, boolean messageSend) {
154 this.destination = destination;
155 // rollback relevant to acks, commit to sends
156 this.messageSend = messageSend;
157 }
158
159 public void incrementOpCount() {
160 opCount++;
161 }
162
163 @Override
164 public int hashCode() {
165 return System.identityHashCode(destination) +
166 System.identityHashCode(Boolean.valueOf(messageSend));
167 }
168
169 @Override
170 public boolean equals(Object other) {
171 return other instanceof PreparedDestinationCompletion &&
172 destination.equals(((PreparedDestinationCompletion) other).destination) &&
173 messageSend == ((PreparedDestinationCompletion) other).messageSend;
174 }
175
176 @Override
177 public void afterRollback() throws Exception {
178 if (!messageSend) {
179 destination.clearPendingMessages();
180 if (LOG.isDebugEnabled()) {
181 LOG.debug("cleared pending from afterRollback : " + destination);
182 }
183 }
184 }
185
186 @Override
187 public void afterCommit() throws Exception {
188 if (messageSend) {
189 destination.clearPendingMessages();
190 destination.getDestinationStatistics().getEnqueues().add(opCount);
191 destination.getDestinationStatistics().getMessages().add(opCount);
192 if (LOG.isDebugEnabled()) {
193 LOG.debug("cleared pending from afterCommit : " + destination);
194 }
195 } else {
196 destination.getDestinationStatistics().getDequeues().add(opCount);
197 destination.getDestinationStatistics().getMessages().subtract(opCount);
198 }
199 }
200 }
201
202 public void stop() throws Exception {
203 transactionStore.stop();
204 next.stop();
205 }
206
207 // ////////////////////////////////////////////////////////////////////////////
208 //
209 // BrokerFilter overrides
210 //
211 // ////////////////////////////////////////////////////////////////////////////
212 public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
213 List<TransactionId> txs = new ArrayList<TransactionId>();
214 synchronized (xaTransactions) {
215 for (Iterator<XATransaction> iter = xaTransactions.values().iterator(); iter.hasNext();) {
216 Transaction tx = iter.next();
217 if (tx.isPrepared()) {
218 if (LOG.isDebugEnabled()) {
219 LOG.debug("prepared transaction: " + tx.getTransactionId());
220 }
221 txs.add(tx.getTransactionId());
222 }
223 }
224 }
225 XATransactionId rc[] = new XATransactionId[txs.size()];
226 txs.toArray(rc);
227 if (LOG.isDebugEnabled()) {
228 LOG.debug("prepared transaction list size: " + rc.length);
229 }
230 return rc;
231 }
232
233 public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
234 // the transaction may have already been started.
235 if (xid.isXATransaction()) {
236 XATransaction transaction = null;
237 synchronized (xaTransactions) {
238 transaction = xaTransactions.get(xid);
239 if (transaction != null) {
240 return;
241 }
242 transaction = new XATransaction(transactionStore, (XATransactionId)xid, this, context.getConnectionId());
243 xaTransactions.put(xid, transaction);
244 }
245 } else {
246 Map<TransactionId, Transaction> transactionMap = context.getTransactions();
247 Transaction transaction = transactionMap.get(xid);
248 if (transaction != null) {
249 throw new JMSException("Transaction '" + xid + "' has already been started.");
250 }
251 transaction = new LocalTransaction(transactionStore, (LocalTransactionId)xid, context);
252 transactionMap.put(xid, transaction);
253 }
254 }
255
256 public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
257 Transaction transaction = getTransaction(context, xid, false);
258 return transaction.prepare();
259 }
260
261 public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
262 Transaction transaction = getTransaction(context, xid, true);
263 transaction.commit(onePhase);
264 }
265
266 public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
267 Transaction transaction = getTransaction(context, xid, true);
268 transaction.rollback();
269 }
270
271 public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
272 Transaction transaction = getTransaction(context, xid, true);
273 transaction.rollback();
274 }
275
276 public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
277 // This method may be invoked recursively.
278 // Track original tx so that it can be restored.
279 final ConnectionContext context = consumerExchange.getConnectionContext();
280 Transaction originalTx = context.getTransaction();
281 Transaction transaction = null;
282 if (ack.isInTransaction()) {
283 transaction = getTransaction(context, ack.getTransactionId(), false);
284 }
285 context.setTransaction(transaction);
286 try {
287 next.acknowledge(consumerExchange, ack);
288 } finally {
289 context.setTransaction(originalTx);
290 }
291 }
292
293 public void send(ProducerBrokerExchange producerExchange, final Message message) throws Exception {
294 // This method may be invoked recursively.
295 // Track original tx so that it can be restored.
296 final ConnectionContext context = producerExchange.getConnectionContext();
297 Transaction originalTx = context.getTransaction();
298 Transaction transaction = null;
299 Synchronization sync = null;
300 if (message.getTransactionId() != null) {
301 transaction = getTransaction(context, message.getTransactionId(), false);
302 if (transaction != null) {
303 sync = new Synchronization() {
304
305 public void afterRollback() {
306 if (audit != null) {
307 audit.rollback(message);
308 }
309 }
310 };
311 transaction.addSynchronization(sync);
312 }
313 }
314 if (audit == null || !audit.isDuplicate(message)) {
315 context.setTransaction(transaction);
316 try {
317 next.send(producerExchange, message);
318 } finally {
319 context.setTransaction(originalTx);
320 }
321 } else {
322 if (sync != null && transaction != null) {
323 transaction.removeSynchronization(sync);
324 }
325 if (LOG.isDebugEnabled()) {
326 LOG.debug("IGNORING duplicate message " + message);
327 }
328 }
329 }
330
331 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
332 for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
333 try {
334 Transaction transaction = iter.next();
335 transaction.rollback();
336 } catch (Exception e) {
337 LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
338 }
339 iter.remove();
340 }
341
342 synchronized (xaTransactions) {
343 // first find all txs that belongs to the connection
344 ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
345 for (XATransaction tx : xaTransactions.values()) {
346 if (tx.getConnectionId() != null && tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
347 txs.add(tx);
348 }
349 }
350
351 // then remove them
352 // two steps needed to avoid ConcurrentModificationException, from removeTransaction()
353 for (XATransaction tx : txs) {
354 try {
355 tx.rollback();
356 } catch (Exception e) {
357 LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e);
358 }
359 }
360
361 }
362 next.removeConnection(context, info, error);
363 }
364
365 // ////////////////////////////////////////////////////////////////////////////
366 //
367 // Implementation help methods.
368 //
369 // ////////////////////////////////////////////////////////////////////////////
370 public Transaction getTransaction(ConnectionContext context, TransactionId xid, boolean mightBePrepared) throws JMSException, XAException {
371 Map transactionMap = null;
372 synchronized (xaTransactions) {
373 transactionMap = xid.isXATransaction() ? xaTransactions : context.getTransactions();
374 }
375 Transaction transaction = (Transaction)transactionMap.get(xid);
376 if (transaction != null) {
377 return transaction;
378 }
379 if (xid.isXATransaction()) {
380 XAException e = new XAException("Transaction '" + xid + "' has not been started.");
381 e.errorCode = XAException.XAER_NOTA;
382 throw e;
383 } else {
384 throw new JMSException("Transaction '" + xid + "' has not been started.");
385 }
386 }
387
388 public void removeTransaction(XATransactionId xid) {
389 synchronized (xaTransactions) {
390 xaTransactions.remove(xid);
391 }
392 }
393
394 public synchronized void brokerServiceStarted() {
395 super.brokerServiceStarted();
396 if (getBrokerService().isSupportFailOver() && audit == null) {
397 audit = new ActiveMQMessageAudit();
398 }
399 }
400
401 }