001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.state;
018
019 import java.io.IOException;
020 import java.util.Iterator;
021 import java.util.LinkedHashMap;
022 import java.util.Map;
023 import java.util.Vector;
024 import java.util.Map.Entry;
025 import java.util.concurrent.ConcurrentHashMap;
026
027 import javax.jms.TransactionRolledBackException;
028 import javax.transaction.xa.XAResource;
029
030 import org.apache.activemq.command.Command;
031 import org.apache.activemq.command.ConnectionId;
032 import org.apache.activemq.command.ConnectionInfo;
033 import org.apache.activemq.command.ConsumerControl;
034 import org.apache.activemq.command.ConsumerId;
035 import org.apache.activemq.command.ConsumerInfo;
036 import org.apache.activemq.command.DestinationInfo;
037 import org.apache.activemq.command.ExceptionResponse;
038 import org.apache.activemq.command.IntegerResponse;
039 import org.apache.activemq.command.Message;
040 import org.apache.activemq.command.MessagePull;
041 import org.apache.activemq.command.ProducerId;
042 import org.apache.activemq.command.ProducerInfo;
043 import org.apache.activemq.command.Response;
044 import org.apache.activemq.command.SessionId;
045 import org.apache.activemq.command.SessionInfo;
046 import org.apache.activemq.command.TransactionInfo;
047 import org.apache.activemq.transport.Transport;
048 import org.apache.activemq.util.IOExceptionSupport;
049 import org.slf4j.Logger;
050 import org.slf4j.LoggerFactory;
051
052 /**
053 * Tracks the state of a connection so a newly established transport can be
054 * re-initialized to the state that was tracked.
055 *
056 *
057 */
058 public class ConnectionStateTracker extends CommandVisitorAdapter {
059 private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
060
061 private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
062 private static final int MESSAGE_PULL_SIZE = 400;
063 protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
064
065 private boolean trackTransactions;
066 private boolean restoreSessions = true;
067 private boolean restoreConsumers = true;
068 private boolean restoreProducers = true;
069 private boolean restoreTransaction = true;
070 private boolean trackMessages = true;
071 private boolean trackTransactionProducers = true;
072 private int maxCacheSize = 128 * 1024;
073 private int currentCacheSize;
074 private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
075 protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
076 boolean result = currentCacheSize > maxCacheSize;
077 if (result) {
078 if (eldest.getValue() instanceof Message) {
079 currentCacheSize -= ((Message)eldest.getValue()).getSize();
080 } else if (eldest.getValue() instanceof MessagePull) {
081 currentCacheSize -= MESSAGE_PULL_SIZE;
082 }
083 if (LOG.isTraceEnabled()) {
084 LOG.trace("removing tracked message: " + eldest.getKey());
085 }
086 }
087 return result;
088 }
089 };
090
091 private class RemoveTransactionAction implements ResponseHandler {
092 private final TransactionInfo info;
093
094 public RemoveTransactionAction(TransactionInfo info) {
095 this.info = info;
096 }
097
098 public void onResponse(Command response) {
099 ConnectionId connectionId = info.getConnectionId();
100 ConnectionState cs = connectionStates.get(connectionId);
101 cs.removeTransactionState(info.getTransactionId());
102 }
103 }
104
105 private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
106
107 public PrepareReadonlyTransactionAction(TransactionInfo info) {
108 super(info);
109 }
110
111 public void onResponse(Command command) {
112 IntegerResponse response = (IntegerResponse) command;
113 if (XAResource.XA_RDONLY == response.getResult()) {
114 // all done, no commit or rollback from TM
115 super.onResponse(command);
116 }
117 }
118 }
119
120 /**
121 *
122 *
123 * @param command
124 * @return null if the command is not state tracked.
125 * @throws IOException
126 */
127 public Tracked track(Command command) throws IOException {
128 try {
129 return (Tracked)command.visit(this);
130 } catch (IOException e) {
131 throw e;
132 } catch (Throwable e) {
133 throw IOExceptionSupport.create(e);
134 }
135 }
136
137 public void trackBack(Command command) {
138 if (command != null) {
139 if (trackMessages && command.isMessage()) {
140 Message message = (Message) command;
141 if (message.getTransactionId()==null) {
142 currentCacheSize = currentCacheSize + message.getSize();
143 }
144 } else if (command instanceof MessagePull) {
145 // just needs to be a rough estimate of size, ~4 identifiers
146 currentCacheSize += MESSAGE_PULL_SIZE;
147 }
148 }
149 }
150
151 public void restore(Transport transport) throws IOException {
152 // Restore the connections.
153 for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
154 ConnectionState connectionState = iter.next();
155 connectionState.getInfo().setFailoverReconnect(true);
156 if (LOG.isDebugEnabled()) {
157 LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
158 }
159 transport.oneway(connectionState.getInfo());
160 restoreTempDestinations(transport, connectionState);
161
162 if (restoreSessions) {
163 restoreSessions(transport, connectionState);
164 }
165
166 if (restoreTransaction) {
167 restoreTransactions(transport, connectionState);
168 }
169 }
170 //now flush messages
171 for (Command msg:messageCache.values()) {
172 if (LOG.isDebugEnabled()) {
173 LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg));
174 }
175 transport.oneway(msg);
176 }
177 }
178
179 private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
180 Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
181 for (TransactionState transactionState : connectionState.getTransactionStates()) {
182 if (LOG.isDebugEnabled()) {
183 LOG.debug("tx: " + transactionState.getId());
184 }
185
186 // rollback any completed transactions - no way to know if commit got there
187 // or if reply went missing
188 //
189 if (!transactionState.getCommands().isEmpty()) {
190 Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
191 if (lastCommand instanceof TransactionInfo) {
192 TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
193 if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
194 if (LOG.isDebugEnabled()) {
195 LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
196 }
197 toRollback.add(transactionInfo);
198 continue;
199 }
200 }
201 }
202
203 // replay short lived producers that may have been involved in the transaction
204 for (ProducerState producerState : transactionState.getProducerStates().values()) {
205 if (LOG.isDebugEnabled()) {
206 LOG.debug("tx replay producer :" + producerState.getInfo());
207 }
208 transport.oneway(producerState.getInfo());
209 }
210
211 for (Command command : transactionState.getCommands()) {
212 if (LOG.isDebugEnabled()) {
213 LOG.debug("tx replay: " + command);
214 }
215 transport.oneway(command);
216 }
217
218 for (ProducerState producerState : transactionState.getProducerStates().values()) {
219 if (LOG.isDebugEnabled()) {
220 LOG.debug("tx remove replayed producer :" + producerState.getInfo());
221 }
222 transport.oneway(producerState.getInfo().createRemoveCommand());
223 }
224 }
225
226 for (TransactionInfo command: toRollback) {
227 // respond to the outstanding commit
228 ExceptionResponse response = new ExceptionResponse();
229 response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
230 response.setCorrelationId(command.getCommandId());
231 transport.getTransportListener().onCommand(response);
232 }
233 }
234
235 /**
236 * @param transport
237 * @param connectionState
238 * @throws IOException
239 */
240 protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
241 // Restore the connection's sessions
242 for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
243 SessionState sessionState = (SessionState)iter2.next();
244 if (LOG.isDebugEnabled()) {
245 LOG.debug("session: " + sessionState.getInfo().getSessionId());
246 }
247 transport.oneway(sessionState.getInfo());
248
249 if (restoreProducers) {
250 restoreProducers(transport, sessionState);
251 }
252
253 if (restoreConsumers) {
254 restoreConsumers(transport, sessionState);
255 }
256 }
257 }
258
259 /**
260 * @param transport
261 * @param sessionState
262 * @throws IOException
263 */
264 protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
265 // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
266 final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
267 final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
268 for (ConsumerState consumerState : sessionState.getConsumerStates()) {
269 ConsumerInfo infoToSend = consumerState.getInfo();
270 if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
271 infoToSend = consumerState.getInfo().copy();
272 connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo());
273 infoToSend.setPrefetchSize(0);
274 if (LOG.isDebugEnabled()) {
275 LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize());
276 }
277 }
278 if (LOG.isDebugEnabled()) {
279 LOG.debug("restore consumer: " + infoToSend.getConsumerId());
280 }
281 transport.oneway(infoToSend);
282 }
283 }
284
285 /**
286 * @param transport
287 * @param sessionState
288 * @throws IOException
289 */
290 protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
291 // Restore the session's producers
292 for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
293 ProducerState producerState = (ProducerState)iter3.next();
294 if (LOG.isDebugEnabled()) {
295 LOG.debug("producer: " + producerState.getInfo().getProducerId());
296 }
297 transport.oneway(producerState.getInfo());
298 }
299 }
300
301 /**
302 * @param transport
303 * @param connectionState
304 * @throws IOException
305 */
306 protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
307 throws IOException {
308 // Restore the connection's temp destinations.
309 for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) {
310 DestinationInfo info = (DestinationInfo)iter2.next();
311 transport.oneway(info);
312 if (LOG.isDebugEnabled()) {
313 LOG.debug("tempDest: " + info.getDestination());
314 }
315 }
316 }
317
318 public Response processAddDestination(DestinationInfo info) {
319 if (info != null) {
320 ConnectionState cs = connectionStates.get(info.getConnectionId());
321 if (cs != null && info.getDestination().isTemporary()) {
322 cs.addTempDestination(info);
323 }
324 }
325 return TRACKED_RESPONSE_MARKER;
326 }
327
328 public Response processRemoveDestination(DestinationInfo info) {
329 if (info != null) {
330 ConnectionState cs = connectionStates.get(info.getConnectionId());
331 if (cs != null && info.getDestination().isTemporary()) {
332 cs.removeTempDestination(info.getDestination());
333 }
334 }
335 return TRACKED_RESPONSE_MARKER;
336 }
337
338 public Response processAddProducer(ProducerInfo info) {
339 if (info != null && info.getProducerId() != null) {
340 SessionId sessionId = info.getProducerId().getParentId();
341 if (sessionId != null) {
342 ConnectionId connectionId = sessionId.getParentId();
343 if (connectionId != null) {
344 ConnectionState cs = connectionStates.get(connectionId);
345 if (cs != null) {
346 SessionState ss = cs.getSessionState(sessionId);
347 if (ss != null) {
348 ss.addProducer(info);
349 }
350 }
351 }
352 }
353 }
354 return TRACKED_RESPONSE_MARKER;
355 }
356
357 public Response processRemoveProducer(ProducerId id) {
358 if (id != null) {
359 SessionId sessionId = id.getParentId();
360 if (sessionId != null) {
361 ConnectionId connectionId = sessionId.getParentId();
362 if (connectionId != null) {
363 ConnectionState cs = connectionStates.get(connectionId);
364 if (cs != null) {
365 SessionState ss = cs.getSessionState(sessionId);
366 if (ss != null) {
367 ss.removeProducer(id);
368 }
369 }
370 }
371 }
372 }
373 return TRACKED_RESPONSE_MARKER;
374 }
375
376 public Response processAddConsumer(ConsumerInfo info) {
377 if (info != null) {
378 SessionId sessionId = info.getConsumerId().getParentId();
379 if (sessionId != null) {
380 ConnectionId connectionId = sessionId.getParentId();
381 if (connectionId != null) {
382 ConnectionState cs = connectionStates.get(connectionId);
383 if (cs != null) {
384 SessionState ss = cs.getSessionState(sessionId);
385 if (ss != null) {
386 ss.addConsumer(info);
387 }
388 }
389 }
390 }
391 }
392 return TRACKED_RESPONSE_MARKER;
393 }
394
395 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
396 if (id != null) {
397 SessionId sessionId = id.getParentId();
398 if (sessionId != null) {
399 ConnectionId connectionId = sessionId.getParentId();
400 if (connectionId != null) {
401 ConnectionState cs = connectionStates.get(connectionId);
402 if (cs != null) {
403 SessionState ss = cs.getSessionState(sessionId);
404 if (ss != null) {
405 ss.removeConsumer(id);
406 }
407 }
408 }
409 }
410 }
411 return TRACKED_RESPONSE_MARKER;
412 }
413
414 public Response processAddSession(SessionInfo info) {
415 if (info != null) {
416 ConnectionId connectionId = info.getSessionId().getParentId();
417 if (connectionId != null) {
418 ConnectionState cs = connectionStates.get(connectionId);
419 if (cs != null) {
420 cs.addSession(info);
421 }
422 }
423 }
424 return TRACKED_RESPONSE_MARKER;
425 }
426
427 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
428 if (id != null) {
429 ConnectionId connectionId = id.getParentId();
430 if (connectionId != null) {
431 ConnectionState cs = connectionStates.get(connectionId);
432 if (cs != null) {
433 cs.removeSession(id);
434 }
435 }
436 }
437 return TRACKED_RESPONSE_MARKER;
438 }
439
440 public Response processAddConnection(ConnectionInfo info) {
441 if (info != null) {
442 connectionStates.put(info.getConnectionId(), new ConnectionState(info));
443 }
444 return TRACKED_RESPONSE_MARKER;
445 }
446
447 public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
448 if (id != null) {
449 connectionStates.remove(id);
450 }
451 return TRACKED_RESPONSE_MARKER;
452 }
453
454 public Response processMessage(Message send) throws Exception {
455 if (send != null) {
456 if (trackTransactions && send.getTransactionId() != null) {
457 ProducerId producerId = send.getProducerId();
458 ConnectionId connectionId = producerId.getParentId().getParentId();
459 if (connectionId != null) {
460 ConnectionState cs = connectionStates.get(connectionId);
461 if (cs != null) {
462 TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
463 if (transactionState != null) {
464 transactionState.addCommand(send);
465
466 if (trackTransactionProducers) {
467 // for jmstemplate, track the producer in case it is closed before commit
468 // and needs to be replayed
469 SessionState ss = cs.getSessionState(producerId.getParentId());
470 ProducerState producerState = ss.getProducerState(producerId);
471 producerState.setTransactionState(transactionState);
472 }
473 }
474 }
475 }
476 return TRACKED_RESPONSE_MARKER;
477 }else if (trackMessages) {
478 messageCache.put(send.getMessageId(), send);
479 }
480 }
481 return null;
482 }
483
484 public Response processBeginTransaction(TransactionInfo info) {
485 if (trackTransactions && info != null && info.getTransactionId() != null) {
486 ConnectionId connectionId = info.getConnectionId();
487 if (connectionId != null) {
488 ConnectionState cs = connectionStates.get(connectionId);
489 if (cs != null) {
490 cs.addTransactionState(info.getTransactionId());
491 TransactionState state = cs.getTransactionState(info.getTransactionId());
492 state.addCommand(info);
493 }
494 }
495 return TRACKED_RESPONSE_MARKER;
496 }
497 return null;
498 }
499
500 public Response processPrepareTransaction(TransactionInfo info) throws Exception {
501 if (trackTransactions && info != null) {
502 ConnectionId connectionId = info.getConnectionId();
503 if (connectionId != null) {
504 ConnectionState cs = connectionStates.get(connectionId);
505 if (cs != null) {
506 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
507 if (transactionState != null) {
508 transactionState.addCommand(info);
509 return new Tracked(new PrepareReadonlyTransactionAction(info));
510 }
511 }
512 }
513 }
514 return null;
515 }
516
517 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
518 if (trackTransactions && info != null) {
519 ConnectionId connectionId = info.getConnectionId();
520 if (connectionId != null) {
521 ConnectionState cs = connectionStates.get(connectionId);
522 if (cs != null) {
523 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
524 if (transactionState != null) {
525 transactionState.addCommand(info);
526 return new Tracked(new RemoveTransactionAction(info));
527 }
528 }
529 }
530 }
531 return null;
532 }
533
534 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
535 if (trackTransactions && info != null) {
536 ConnectionId connectionId = info.getConnectionId();
537 if (connectionId != null) {
538 ConnectionState cs = connectionStates.get(connectionId);
539 if (cs != null) {
540 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
541 if (transactionState != null) {
542 transactionState.addCommand(info);
543 return new Tracked(new RemoveTransactionAction(info));
544 }
545 }
546 }
547 }
548 return null;
549 }
550
551 public Response processRollbackTransaction(TransactionInfo info) throws Exception {
552 if (trackTransactions && info != null) {
553 ConnectionId connectionId = info.getConnectionId();
554 if (connectionId != null) {
555 ConnectionState cs = connectionStates.get(connectionId);
556 if (cs != null) {
557 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
558 if (transactionState != null) {
559 transactionState.addCommand(info);
560 return new Tracked(new RemoveTransactionAction(info));
561 }
562 }
563 }
564 }
565 return null;
566 }
567
568 public Response processEndTransaction(TransactionInfo info) throws Exception {
569 if (trackTransactions && info != null) {
570 ConnectionId connectionId = info.getConnectionId();
571 if (connectionId != null) {
572 ConnectionState cs = connectionStates.get(connectionId);
573 if (cs != null) {
574 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
575 if (transactionState != null) {
576 transactionState.addCommand(info);
577 }
578 }
579 }
580 return TRACKED_RESPONSE_MARKER;
581 }
582 return null;
583 }
584
585 @Override
586 public Response processMessagePull(MessagePull pull) throws Exception {
587 if (pull != null) {
588 // leave a single instance in the cache
589 final String id = pull.getDestination() + "::" + pull.getConsumerId();
590 messageCache.put(id.intern(), pull);
591 }
592 return null;
593 }
594
595 public boolean isRestoreConsumers() {
596 return restoreConsumers;
597 }
598
599 public void setRestoreConsumers(boolean restoreConsumers) {
600 this.restoreConsumers = restoreConsumers;
601 }
602
603 public boolean isRestoreProducers() {
604 return restoreProducers;
605 }
606
607 public void setRestoreProducers(boolean restoreProducers) {
608 this.restoreProducers = restoreProducers;
609 }
610
611 public boolean isRestoreSessions() {
612 return restoreSessions;
613 }
614
615 public void setRestoreSessions(boolean restoreSessions) {
616 this.restoreSessions = restoreSessions;
617 }
618
619 public boolean isTrackTransactions() {
620 return trackTransactions;
621 }
622
623 public void setTrackTransactions(boolean trackTransactions) {
624 this.trackTransactions = trackTransactions;
625 }
626
627 public boolean isTrackTransactionProducers() {
628 return this.trackTransactionProducers;
629 }
630
631 public void setTrackTransactionProducers(boolean trackTransactionProducers) {
632 this.trackTransactionProducers = trackTransactionProducers;
633 }
634
635 public boolean isRestoreTransaction() {
636 return restoreTransaction;
637 }
638
639 public void setRestoreTransaction(boolean restoreTransaction) {
640 this.restoreTransaction = restoreTransaction;
641 }
642
643 public boolean isTrackMessages() {
644 return trackMessages;
645 }
646
647 public void setTrackMessages(boolean trackMessages) {
648 this.trackMessages = trackMessages;
649 }
650
651 public int getMaxCacheSize() {
652 return maxCacheSize;
653 }
654
655 public void setMaxCacheSize(int maxCacheSize) {
656 this.maxCacheSize = maxCacheSize;
657 }
658
659 public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
660 ConnectionState connectionState = connectionStates.get(connectionId);
661 if (connectionState != null) {
662 connectionState.setConnectionInterruptProcessingComplete(true);
663 Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers();
664 for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) {
665 ConsumerControl control = new ConsumerControl();
666 control.setConsumerId(entry.getKey());
667 control.setPrefetch(entry.getValue().getPrefetchSize());
668 control.setDestination(entry.getValue().getDestination());
669 try {
670 if (LOG.isDebugEnabled()) {
671 LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
672 }
673 transport.oneway(control);
674 } catch (Exception ex) {
675 if (LOG.isDebugEnabled()) {
676 LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()
677 + " with: " + control.getPrefetch(), ex);
678 }
679 }
680 }
681 stalledConsumers.clear();
682 }
683 }
684
685 public void transportInterrupted(ConnectionId connectionId) {
686 ConnectionState connectionState = connectionStates.get(connectionId);
687 if (connectionState != null) {
688 connectionState.setConnectionInterruptProcessingComplete(false);
689 }
690 }
691 }