001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.state;
018
019 import java.io.IOException;
020 import java.util.Iterator;
021 import java.util.LinkedHashMap;
022 import java.util.Map;
023 import java.util.Vector;
024 import java.util.Map.Entry;
025 import java.util.concurrent.ConcurrentHashMap;
026
027 import javax.jms.TransactionRolledBackException;
028 import javax.transaction.xa.XAResource;
029
030 import org.apache.activemq.command.Command;
031 import org.apache.activemq.command.ConnectionId;
032 import org.apache.activemq.command.ConnectionInfo;
033 import org.apache.activemq.command.ConsumerControl;
034 import org.apache.activemq.command.ConsumerId;
035 import org.apache.activemq.command.ConsumerInfo;
036 import org.apache.activemq.command.DestinationInfo;
037 import org.apache.activemq.command.ExceptionResponse;
038 import org.apache.activemq.command.IntegerResponse;
039 import org.apache.activemq.command.Message;
040 import org.apache.activemq.command.MessagePull;
041 import org.apache.activemq.command.ProducerId;
042 import org.apache.activemq.command.ProducerInfo;
043 import org.apache.activemq.command.Response;
044 import org.apache.activemq.command.SessionId;
045 import org.apache.activemq.command.SessionInfo;
046 import org.apache.activemq.command.TransactionInfo;
047 import org.apache.activemq.transport.Transport;
048 import org.apache.activemq.util.IOExceptionSupport;
049 import org.slf4j.Logger;
050 import org.slf4j.LoggerFactory;
051
052 /**
053 * Tracks the state of a connection so a newly established transport can be
054 * re-initialized to the state that was tracked.
055 *
056 *
057 */
058 public class ConnectionStateTracker extends CommandVisitorAdapter {
059 private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
060
061 private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
062 private static final int MESSAGE_PULL_SIZE = 400;
063 protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
064
065 private boolean trackTransactions;
066 private boolean restoreSessions = true;
067 private boolean restoreConsumers = true;
068 private boolean restoreProducers = true;
069 private boolean restoreTransaction = true;
070 private boolean trackMessages = true;
071 private boolean trackTransactionProducers = true;
072 private int maxCacheSize = 128 * 1024;
073 private int currentCacheSize;
074 private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
075 protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
076 boolean result = currentCacheSize > maxCacheSize;
077 if (result) {
078 if (eldest.getValue() instanceof Message) {
079 currentCacheSize -= ((Message)eldest.getValue()).getSize();
080 } else if (eldest.getValue() instanceof MessagePull) {
081 currentCacheSize -= MESSAGE_PULL_SIZE;
082 }
083 if (LOG.isTraceEnabled()) {
084 LOG.trace("removing tracked message: " + eldest.getKey());
085 }
086 }
087 return result;
088 }
089 };
090
091 private class RemoveTransactionAction implements ResponseHandler {
092 private final TransactionInfo info;
093
094 public RemoveTransactionAction(TransactionInfo info) {
095 this.info = info;
096 }
097
098 public void onResponse(Command response) {
099 ConnectionId connectionId = info.getConnectionId();
100 ConnectionState cs = connectionStates.get(connectionId);
101 if (cs != null) {
102 cs.removeTransactionState(info.getTransactionId());
103 }
104 }
105 }
106
107 private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
108
109 public PrepareReadonlyTransactionAction(TransactionInfo info) {
110 super(info);
111 }
112
113 public void onResponse(Command command) {
114 IntegerResponse response = (IntegerResponse) command;
115 if (XAResource.XA_RDONLY == response.getResult()) {
116 // all done, no commit or rollback from TM
117 super.onResponse(command);
118 }
119 }
120 }
121
122 /**
123 *
124 *
125 * @param command
126 * @return null if the command is not state tracked.
127 * @throws IOException
128 */
129 public Tracked track(Command command) throws IOException {
130 try {
131 return (Tracked)command.visit(this);
132 } catch (IOException e) {
133 throw e;
134 } catch (Throwable e) {
135 throw IOExceptionSupport.create(e);
136 }
137 }
138
139 public void trackBack(Command command) {
140 if (command != null) {
141 if (trackMessages && command.isMessage()) {
142 Message message = (Message) command;
143 if (message.getTransactionId()==null) {
144 currentCacheSize = currentCacheSize + message.getSize();
145 }
146 } else if (command instanceof MessagePull) {
147 // just needs to be a rough estimate of size, ~4 identifiers
148 currentCacheSize += MESSAGE_PULL_SIZE;
149 }
150 }
151 }
152
153 public void restore(Transport transport) throws IOException {
154 // Restore the connections.
155 for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
156 ConnectionState connectionState = iter.next();
157 connectionState.getInfo().setFailoverReconnect(true);
158 if (LOG.isDebugEnabled()) {
159 LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
160 }
161 transport.oneway(connectionState.getInfo());
162 restoreTempDestinations(transport, connectionState);
163
164 if (restoreSessions) {
165 restoreSessions(transport, connectionState);
166 }
167
168 if (restoreTransaction) {
169 restoreTransactions(transport, connectionState);
170 }
171 }
172 //now flush messages
173 for (Command msg:messageCache.values()) {
174 if (LOG.isDebugEnabled()) {
175 LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg));
176 }
177 transport.oneway(msg);
178 }
179 }
180
181 private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
182 Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
183 for (TransactionState transactionState : connectionState.getTransactionStates()) {
184 if (LOG.isDebugEnabled()) {
185 LOG.debug("tx: " + transactionState.getId());
186 }
187
188 // rollback any completed transactions - no way to know if commit got there
189 // or if reply went missing
190 //
191 if (!transactionState.getCommands().isEmpty()) {
192 Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
193 if (lastCommand instanceof TransactionInfo) {
194 TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
195 if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
196 if (LOG.isDebugEnabled()) {
197 LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
198 }
199 toRollback.add(transactionInfo);
200 continue;
201 }
202 }
203 }
204
205 // replay short lived producers that may have been involved in the transaction
206 for (ProducerState producerState : transactionState.getProducerStates().values()) {
207 if (LOG.isDebugEnabled()) {
208 LOG.debug("tx replay producer :" + producerState.getInfo());
209 }
210 transport.oneway(producerState.getInfo());
211 }
212
213 for (Command command : transactionState.getCommands()) {
214 if (LOG.isDebugEnabled()) {
215 LOG.debug("tx replay: " + command);
216 }
217 transport.oneway(command);
218 }
219
220 for (ProducerState producerState : transactionState.getProducerStates().values()) {
221 if (LOG.isDebugEnabled()) {
222 LOG.debug("tx remove replayed producer :" + producerState.getInfo());
223 }
224 transport.oneway(producerState.getInfo().createRemoveCommand());
225 }
226 }
227
228 for (TransactionInfo command: toRollback) {
229 // respond to the outstanding commit
230 ExceptionResponse response = new ExceptionResponse();
231 response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
232 response.setCorrelationId(command.getCommandId());
233 transport.getTransportListener().onCommand(response);
234 }
235 }
236
237 /**
238 * @param transport
239 * @param connectionState
240 * @throws IOException
241 */
242 protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
243 // Restore the connection's sessions
244 for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
245 SessionState sessionState = (SessionState)iter2.next();
246 if (LOG.isDebugEnabled()) {
247 LOG.debug("session: " + sessionState.getInfo().getSessionId());
248 }
249 transport.oneway(sessionState.getInfo());
250
251 if (restoreProducers) {
252 restoreProducers(transport, sessionState);
253 }
254
255 if (restoreConsumers) {
256 restoreConsumers(transport, sessionState);
257 }
258 }
259 }
260
261 /**
262 * @param transport
263 * @param sessionState
264 * @throws IOException
265 */
266 protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
267 // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
268 final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
269 final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
270 for (ConsumerState consumerState : sessionState.getConsumerStates()) {
271 ConsumerInfo infoToSend = consumerState.getInfo();
272 if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
273 infoToSend = consumerState.getInfo().copy();
274 connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo());
275 infoToSend.setPrefetchSize(0);
276 if (LOG.isDebugEnabled()) {
277 LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize());
278 }
279 }
280 if (LOG.isDebugEnabled()) {
281 LOG.debug("restore consumer: " + infoToSend.getConsumerId());
282 }
283 transport.oneway(infoToSend);
284 }
285 }
286
287 /**
288 * @param transport
289 * @param sessionState
290 * @throws IOException
291 */
292 protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
293 // Restore the session's producers
294 for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
295 ProducerState producerState = (ProducerState)iter3.next();
296 if (LOG.isDebugEnabled()) {
297 LOG.debug("producer: " + producerState.getInfo().getProducerId());
298 }
299 transport.oneway(producerState.getInfo());
300 }
301 }
302
303 /**
304 * @param transport
305 * @param connectionState
306 * @throws IOException
307 */
308 protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
309 throws IOException {
310 // Restore the connection's temp destinations.
311 for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) {
312 DestinationInfo info = (DestinationInfo)iter2.next();
313 transport.oneway(info);
314 if (LOG.isDebugEnabled()) {
315 LOG.debug("tempDest: " + info.getDestination());
316 }
317 }
318 }
319
320 public Response processAddDestination(DestinationInfo info) {
321 if (info != null) {
322 ConnectionState cs = connectionStates.get(info.getConnectionId());
323 if (cs != null && info.getDestination().isTemporary()) {
324 cs.addTempDestination(info);
325 }
326 }
327 return TRACKED_RESPONSE_MARKER;
328 }
329
330 public Response processRemoveDestination(DestinationInfo info) {
331 if (info != null) {
332 ConnectionState cs = connectionStates.get(info.getConnectionId());
333 if (cs != null && info.getDestination().isTemporary()) {
334 cs.removeTempDestination(info.getDestination());
335 }
336 }
337 return TRACKED_RESPONSE_MARKER;
338 }
339
340 public Response processAddProducer(ProducerInfo info) {
341 if (info != null && info.getProducerId() != null) {
342 SessionId sessionId = info.getProducerId().getParentId();
343 if (sessionId != null) {
344 ConnectionId connectionId = sessionId.getParentId();
345 if (connectionId != null) {
346 ConnectionState cs = connectionStates.get(connectionId);
347 if (cs != null) {
348 SessionState ss = cs.getSessionState(sessionId);
349 if (ss != null) {
350 ss.addProducer(info);
351 }
352 }
353 }
354 }
355 }
356 return TRACKED_RESPONSE_MARKER;
357 }
358
359 public Response processRemoveProducer(ProducerId id) {
360 if (id != null) {
361 SessionId sessionId = id.getParentId();
362 if (sessionId != null) {
363 ConnectionId connectionId = sessionId.getParentId();
364 if (connectionId != null) {
365 ConnectionState cs = connectionStates.get(connectionId);
366 if (cs != null) {
367 SessionState ss = cs.getSessionState(sessionId);
368 if (ss != null) {
369 ss.removeProducer(id);
370 }
371 }
372 }
373 }
374 }
375 return TRACKED_RESPONSE_MARKER;
376 }
377
378 public Response processAddConsumer(ConsumerInfo info) {
379 if (info != null) {
380 SessionId sessionId = info.getConsumerId().getParentId();
381 if (sessionId != null) {
382 ConnectionId connectionId = sessionId.getParentId();
383 if (connectionId != null) {
384 ConnectionState cs = connectionStates.get(connectionId);
385 if (cs != null) {
386 SessionState ss = cs.getSessionState(sessionId);
387 if (ss != null) {
388 ss.addConsumer(info);
389 }
390 }
391 }
392 }
393 }
394 return TRACKED_RESPONSE_MARKER;
395 }
396
397 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
398 if (id != null) {
399 SessionId sessionId = id.getParentId();
400 if (sessionId != null) {
401 ConnectionId connectionId = sessionId.getParentId();
402 if (connectionId != null) {
403 ConnectionState cs = connectionStates.get(connectionId);
404 if (cs != null) {
405 SessionState ss = cs.getSessionState(sessionId);
406 if (ss != null) {
407 ss.removeConsumer(id);
408 }
409 }
410 }
411 }
412 }
413 return TRACKED_RESPONSE_MARKER;
414 }
415
416 public Response processAddSession(SessionInfo info) {
417 if (info != null) {
418 ConnectionId connectionId = info.getSessionId().getParentId();
419 if (connectionId != null) {
420 ConnectionState cs = connectionStates.get(connectionId);
421 if (cs != null) {
422 cs.addSession(info);
423 }
424 }
425 }
426 return TRACKED_RESPONSE_MARKER;
427 }
428
429 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
430 if (id != null) {
431 ConnectionId connectionId = id.getParentId();
432 if (connectionId != null) {
433 ConnectionState cs = connectionStates.get(connectionId);
434 if (cs != null) {
435 cs.removeSession(id);
436 }
437 }
438 }
439 return TRACKED_RESPONSE_MARKER;
440 }
441
442 public Response processAddConnection(ConnectionInfo info) {
443 if (info != null) {
444 connectionStates.put(info.getConnectionId(), new ConnectionState(info));
445 }
446 return TRACKED_RESPONSE_MARKER;
447 }
448
449 public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
450 if (id != null) {
451 connectionStates.remove(id);
452 }
453 return TRACKED_RESPONSE_MARKER;
454 }
455
456 public Response processMessage(Message send) throws Exception {
457 if (send != null) {
458 if (trackTransactions && send.getTransactionId() != null) {
459 ProducerId producerId = send.getProducerId();
460 ConnectionId connectionId = producerId.getParentId().getParentId();
461 if (connectionId != null) {
462 ConnectionState cs = connectionStates.get(connectionId);
463 if (cs != null) {
464 TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
465 if (transactionState != null) {
466 transactionState.addCommand(send);
467
468 if (trackTransactionProducers) {
469 // for jmstemplate, track the producer in case it is closed before commit
470 // and needs to be replayed
471 SessionState ss = cs.getSessionState(producerId.getParentId());
472 ProducerState producerState = ss.getProducerState(producerId);
473 producerState.setTransactionState(transactionState);
474 }
475 }
476 }
477 }
478 return TRACKED_RESPONSE_MARKER;
479 }else if (trackMessages) {
480 messageCache.put(send.getMessageId(), send);
481 }
482 }
483 return null;
484 }
485
486 public Response processBeginTransaction(TransactionInfo info) {
487 if (trackTransactions && info != null && info.getTransactionId() != null) {
488 ConnectionId connectionId = info.getConnectionId();
489 if (connectionId != null) {
490 ConnectionState cs = connectionStates.get(connectionId);
491 if (cs != null) {
492 cs.addTransactionState(info.getTransactionId());
493 TransactionState state = cs.getTransactionState(info.getTransactionId());
494 state.addCommand(info);
495 }
496 }
497 return TRACKED_RESPONSE_MARKER;
498 }
499 return null;
500 }
501
502 public Response processPrepareTransaction(TransactionInfo info) throws Exception {
503 if (trackTransactions && info != null) {
504 ConnectionId connectionId = info.getConnectionId();
505 if (connectionId != null) {
506 ConnectionState cs = connectionStates.get(connectionId);
507 if (cs != null) {
508 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
509 if (transactionState != null) {
510 transactionState.addCommand(info);
511 return new Tracked(new PrepareReadonlyTransactionAction(info));
512 }
513 }
514 }
515 }
516 return null;
517 }
518
519 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
520 if (trackTransactions && info != null) {
521 ConnectionId connectionId = info.getConnectionId();
522 if (connectionId != null) {
523 ConnectionState cs = connectionStates.get(connectionId);
524 if (cs != null) {
525 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
526 if (transactionState != null) {
527 transactionState.addCommand(info);
528 return new Tracked(new RemoveTransactionAction(info));
529 }
530 }
531 }
532 }
533 return null;
534 }
535
536 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
537 if (trackTransactions && info != null) {
538 ConnectionId connectionId = info.getConnectionId();
539 if (connectionId != null) {
540 ConnectionState cs = connectionStates.get(connectionId);
541 if (cs != null) {
542 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
543 if (transactionState != null) {
544 transactionState.addCommand(info);
545 return new Tracked(new RemoveTransactionAction(info));
546 }
547 }
548 }
549 }
550 return null;
551 }
552
553 public Response processRollbackTransaction(TransactionInfo info) throws Exception {
554 if (trackTransactions && info != null) {
555 ConnectionId connectionId = info.getConnectionId();
556 if (connectionId != null) {
557 ConnectionState cs = connectionStates.get(connectionId);
558 if (cs != null) {
559 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
560 if (transactionState != null) {
561 transactionState.addCommand(info);
562 return new Tracked(new RemoveTransactionAction(info));
563 }
564 }
565 }
566 }
567 return null;
568 }
569
570 public Response processEndTransaction(TransactionInfo info) throws Exception {
571 if (trackTransactions && info != null) {
572 ConnectionId connectionId = info.getConnectionId();
573 if (connectionId != null) {
574 ConnectionState cs = connectionStates.get(connectionId);
575 if (cs != null) {
576 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
577 if (transactionState != null) {
578 transactionState.addCommand(info);
579 }
580 }
581 }
582 return TRACKED_RESPONSE_MARKER;
583 }
584 return null;
585 }
586
587 @Override
588 public Response processMessagePull(MessagePull pull) throws Exception {
589 if (pull != null) {
590 // leave a single instance in the cache
591 final String id = pull.getDestination() + "::" + pull.getConsumerId();
592 messageCache.put(id.intern(), pull);
593 }
594 return null;
595 }
596
597 public boolean isRestoreConsumers() {
598 return restoreConsumers;
599 }
600
601 public void setRestoreConsumers(boolean restoreConsumers) {
602 this.restoreConsumers = restoreConsumers;
603 }
604
605 public boolean isRestoreProducers() {
606 return restoreProducers;
607 }
608
609 public void setRestoreProducers(boolean restoreProducers) {
610 this.restoreProducers = restoreProducers;
611 }
612
613 public boolean isRestoreSessions() {
614 return restoreSessions;
615 }
616
617 public void setRestoreSessions(boolean restoreSessions) {
618 this.restoreSessions = restoreSessions;
619 }
620
621 public boolean isTrackTransactions() {
622 return trackTransactions;
623 }
624
625 public void setTrackTransactions(boolean trackTransactions) {
626 this.trackTransactions = trackTransactions;
627 }
628
629 public boolean isTrackTransactionProducers() {
630 return this.trackTransactionProducers;
631 }
632
633 public void setTrackTransactionProducers(boolean trackTransactionProducers) {
634 this.trackTransactionProducers = trackTransactionProducers;
635 }
636
637 public boolean isRestoreTransaction() {
638 return restoreTransaction;
639 }
640
641 public void setRestoreTransaction(boolean restoreTransaction) {
642 this.restoreTransaction = restoreTransaction;
643 }
644
645 public boolean isTrackMessages() {
646 return trackMessages;
647 }
648
649 public void setTrackMessages(boolean trackMessages) {
650 this.trackMessages = trackMessages;
651 }
652
653 public int getMaxCacheSize() {
654 return maxCacheSize;
655 }
656
657 public void setMaxCacheSize(int maxCacheSize) {
658 this.maxCacheSize = maxCacheSize;
659 }
660
661 public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
662 ConnectionState connectionState = connectionStates.get(connectionId);
663 if (connectionState != null) {
664 connectionState.setConnectionInterruptProcessingComplete(true);
665 Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers();
666 for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) {
667 ConsumerControl control = new ConsumerControl();
668 control.setConsumerId(entry.getKey());
669 control.setPrefetch(entry.getValue().getPrefetchSize());
670 control.setDestination(entry.getValue().getDestination());
671 try {
672 if (LOG.isDebugEnabled()) {
673 LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
674 }
675 transport.oneway(control);
676 } catch (Exception ex) {
677 if (LOG.isDebugEnabled()) {
678 LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()
679 + " with: " + control.getPrefetch(), ex);
680 }
681 }
682 }
683 stalledConsumers.clear();
684 }
685 }
686
687 public void transportInterrupted(ConnectionId connectionId) {
688 ConnectionState connectionState = connectionStates.get(connectionId);
689 if (connectionState != null) {
690 connectionState.setConnectionInterruptProcessingComplete(false);
691 }
692 }
693 }