001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.state;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.LinkedHashMap;
022    import java.util.Map;
023    import java.util.Vector;
024    import java.util.Map.Entry;
025    import java.util.concurrent.ConcurrentHashMap;
026    
027    import javax.jms.TransactionRolledBackException;
028    import javax.transaction.xa.XAResource;
029    
030    import org.apache.activemq.command.Command;
031    import org.apache.activemq.command.ConnectionId;
032    import org.apache.activemq.command.ConnectionInfo;
033    import org.apache.activemq.command.ConsumerControl;
034    import org.apache.activemq.command.ConsumerId;
035    import org.apache.activemq.command.ConsumerInfo;
036    import org.apache.activemq.command.DestinationInfo;
037    import org.apache.activemq.command.ExceptionResponse;
038    import org.apache.activemq.command.IntegerResponse;
039    import org.apache.activemq.command.Message;
040    import org.apache.activemq.command.MessagePull;
041    import org.apache.activemq.command.ProducerId;
042    import org.apache.activemq.command.ProducerInfo;
043    import org.apache.activemq.command.Response;
044    import org.apache.activemq.command.SessionId;
045    import org.apache.activemq.command.SessionInfo;
046    import org.apache.activemq.command.TransactionInfo;
047    import org.apache.activemq.transport.Transport;
048    import org.apache.activemq.util.IOExceptionSupport;
049    import org.slf4j.Logger;
050    import org.slf4j.LoggerFactory;
051    
052    /**
053     * Tracks the state of a connection so a newly established transport can be
054     * re-initialized to the state that was tracked.
055     * 
056     * 
057     */
058    public class ConnectionStateTracker extends CommandVisitorAdapter {
059        private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
060    
061        private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
062        private static final int MESSAGE_PULL_SIZE = 400;
063        protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); 
064    
065        private boolean trackTransactions;
066        private boolean restoreSessions = true;
067        private boolean restoreConsumers = true;
068        private boolean restoreProducers = true;
069        private boolean restoreTransaction = true;
070        private boolean trackMessages = true;
071        private boolean trackTransactionProducers = true;
072        private int maxCacheSize = 128 * 1024;
073        private int currentCacheSize;
074        private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
075            protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
076                boolean result = currentCacheSize > maxCacheSize;
077                if (result) {
078                    if (eldest.getValue() instanceof Message) {
079                        currentCacheSize -= ((Message)eldest.getValue()).getSize();
080                    } else if (eldest.getValue() instanceof MessagePull) {
081                        currentCacheSize -= MESSAGE_PULL_SIZE;
082                    }
083                    if (LOG.isTraceEnabled()) {
084                        LOG.trace("removing tracked message: " + eldest.getKey());
085                    }
086                }
087                return result;
088            }
089        };
090        
091        private class RemoveTransactionAction implements ResponseHandler {
092            private final TransactionInfo info;
093    
094            public RemoveTransactionAction(TransactionInfo info) {
095                this.info = info;
096            }
097    
098            public void onResponse(Command response) {
099                ConnectionId connectionId = info.getConnectionId();
100                ConnectionState cs = connectionStates.get(connectionId);
101                if (cs != null) {
102                    cs.removeTransactionState(info.getTransactionId());
103                }
104            }
105        }
106        
107        private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
108    
109            public PrepareReadonlyTransactionAction(TransactionInfo info) {
110                super(info);
111            }
112    
113            public void onResponse(Command command) {
114                IntegerResponse response = (IntegerResponse) command;
115                if (XAResource.XA_RDONLY == response.getResult()) {
116                    // all done, no commit or rollback from TM
117                    super.onResponse(command);
118                }
119            }
120        }
121    
122        /**
123         * 
124         * 
125         * @param command
126         * @return null if the command is not state tracked.
127         * @throws IOException
128         */
129        public Tracked track(Command command) throws IOException {
130            try {
131                return (Tracked)command.visit(this);
132            } catch (IOException e) {
133                throw e;
134            } catch (Throwable e) {
135                throw IOExceptionSupport.create(e);
136            }
137        }
138        
139        public void trackBack(Command command) {
140            if (command != null) {
141                if (trackMessages && command.isMessage()) {
142                    Message message = (Message) command;
143                    if (message.getTransactionId()==null) {
144                        currentCacheSize = currentCacheSize +  message.getSize();
145                    }
146                } else if (command instanceof MessagePull) {
147                    // just needs to be a rough estimate of size, ~4 identifiers
148                    currentCacheSize += MESSAGE_PULL_SIZE;
149                }
150            }
151        }
152    
153        public void restore(Transport transport) throws IOException {
154            // Restore the connections.
155            for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
156                ConnectionState connectionState = iter.next();
157                connectionState.getInfo().setFailoverReconnect(true);
158                if (LOG.isDebugEnabled()) {
159                    LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
160                }
161                transport.oneway(connectionState.getInfo());
162                restoreTempDestinations(transport, connectionState);
163    
164                if (restoreSessions) {
165                    restoreSessions(transport, connectionState);
166                }
167    
168                if (restoreTransaction) {
169                    restoreTransactions(transport, connectionState);
170                }
171            }
172            //now flush messages
173            for (Command msg:messageCache.values()) {
174                if (LOG.isDebugEnabled()) {
175                    LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg));
176                }
177                transport.oneway(msg);
178            }
179        }
180    
181        private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
182            Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
183            for (TransactionState transactionState : connectionState.getTransactionStates()) {
184                if (LOG.isDebugEnabled()) {
185                    LOG.debug("tx: " + transactionState.getId());
186                }
187                
188                // rollback any completed transactions - no way to know if commit got there
189                // or if reply went missing
190                //
191                if (!transactionState.getCommands().isEmpty()) {
192                    Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
193                    if (lastCommand instanceof TransactionInfo) {
194                        TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
195                        if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
196                            if (LOG.isDebugEnabled()) {
197                                LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
198                            }
199                            toRollback.add(transactionInfo);
200                            continue;
201                        }
202                    }
203                }
204                
205                // replay short lived producers that may have been involved in the transaction
206                for (ProducerState producerState : transactionState.getProducerStates().values()) {
207                    if (LOG.isDebugEnabled()) {
208                        LOG.debug("tx replay producer :" + producerState.getInfo());
209                    }
210                    transport.oneway(producerState.getInfo());
211                }
212                
213                for (Command command : transactionState.getCommands()) {
214                    if (LOG.isDebugEnabled()) {
215                        LOG.debug("tx replay: " + command);
216                    }
217                    transport.oneway(command);
218                }
219                
220                for (ProducerState producerState : transactionState.getProducerStates().values()) {
221                    if (LOG.isDebugEnabled()) {
222                        LOG.debug("tx remove replayed producer :" + producerState.getInfo());
223                    }
224                    transport.oneway(producerState.getInfo().createRemoveCommand());
225                }
226            }
227            
228            for (TransactionInfo command: toRollback) {
229                // respond to the outstanding commit
230                ExceptionResponse response = new ExceptionResponse();
231                response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
232                response.setCorrelationId(command.getCommandId());
233                transport.getTransportListener().onCommand(response);
234            }
235        }
236    
237        /**
238         * @param transport
239         * @param connectionState
240         * @throws IOException
241         */
242        protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
243            // Restore the connection's sessions
244            for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
245                SessionState sessionState = (SessionState)iter2.next();
246                if (LOG.isDebugEnabled()) {
247                    LOG.debug("session: " + sessionState.getInfo().getSessionId());
248                }
249                transport.oneway(sessionState.getInfo());
250    
251                if (restoreProducers) {
252                    restoreProducers(transport, sessionState);
253                }
254    
255                if (restoreConsumers) {
256                    restoreConsumers(transport, sessionState);
257                }
258            }
259        }
260    
261        /**
262         * @param transport
263         * @param sessionState
264         * @throws IOException
265         */
266        protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
267            // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
268            final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
269            final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
270            for (ConsumerState consumerState : sessionState.getConsumerStates()) {   
271                ConsumerInfo infoToSend = consumerState.getInfo();
272                if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
273                    infoToSend = consumerState.getInfo().copy();
274                    connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo());
275                    infoToSend.setPrefetchSize(0);
276                    if (LOG.isDebugEnabled()) {
277                        LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize());
278                    }
279                }
280                if (LOG.isDebugEnabled()) {
281                    LOG.debug("restore consumer: " + infoToSend.getConsumerId());
282                }
283                transport.oneway(infoToSend);
284            }
285        }
286    
287        /**
288         * @param transport
289         * @param sessionState
290         * @throws IOException
291         */
292        protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
293            // Restore the session's producers
294            for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
295                ProducerState producerState = (ProducerState)iter3.next();
296                if (LOG.isDebugEnabled()) {
297                    LOG.debug("producer: " + producerState.getInfo().getProducerId());
298                }
299                transport.oneway(producerState.getInfo());
300            }
301        }
302    
303        /**
304         * @param transport
305         * @param connectionState
306         * @throws IOException
307         */
308        protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
309            throws IOException {
310            // Restore the connection's temp destinations.
311            for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) {
312                DestinationInfo info = (DestinationInfo)iter2.next();
313                transport.oneway(info);
314                if (LOG.isDebugEnabled()) {
315                    LOG.debug("tempDest: " + info.getDestination());
316                }
317            }
318        }
319    
320        public Response processAddDestination(DestinationInfo info) {
321            if (info != null) {
322                ConnectionState cs = connectionStates.get(info.getConnectionId());
323                if (cs != null && info.getDestination().isTemporary()) {
324                    cs.addTempDestination(info);
325                }
326            }
327            return TRACKED_RESPONSE_MARKER;
328        }
329    
330        public Response processRemoveDestination(DestinationInfo info) {
331            if (info != null) {
332                ConnectionState cs = connectionStates.get(info.getConnectionId());
333                if (cs != null && info.getDestination().isTemporary()) {
334                    cs.removeTempDestination(info.getDestination());
335                }
336            }
337            return TRACKED_RESPONSE_MARKER;
338        }
339    
340        public Response processAddProducer(ProducerInfo info) {
341            if (info != null && info.getProducerId() != null) {
342                SessionId sessionId = info.getProducerId().getParentId();
343                if (sessionId != null) {
344                    ConnectionId connectionId = sessionId.getParentId();
345                    if (connectionId != null) {
346                        ConnectionState cs = connectionStates.get(connectionId);
347                        if (cs != null) {
348                            SessionState ss = cs.getSessionState(sessionId);
349                            if (ss != null) {
350                                ss.addProducer(info);
351                            }
352                        }
353                    }
354                }
355            }
356            return TRACKED_RESPONSE_MARKER;
357        }
358    
359        public Response processRemoveProducer(ProducerId id) {
360            if (id != null) {
361                SessionId sessionId = id.getParentId();
362                if (sessionId != null) {
363                    ConnectionId connectionId = sessionId.getParentId();
364                    if (connectionId != null) {
365                        ConnectionState cs = connectionStates.get(connectionId);
366                        if (cs != null) {
367                            SessionState ss = cs.getSessionState(sessionId);
368                            if (ss != null) {
369                                ss.removeProducer(id);
370                            }
371                        }
372                    }
373                }
374            }
375            return TRACKED_RESPONSE_MARKER;
376        }
377    
378        public Response processAddConsumer(ConsumerInfo info) {
379            if (info != null) {
380                SessionId sessionId = info.getConsumerId().getParentId();
381                if (sessionId != null) {
382                    ConnectionId connectionId = sessionId.getParentId();
383                    if (connectionId != null) {
384                        ConnectionState cs = connectionStates.get(connectionId);
385                        if (cs != null) {
386                            SessionState ss = cs.getSessionState(sessionId);
387                            if (ss != null) {
388                                ss.addConsumer(info);
389                            }
390                        }
391                    }
392                }
393            }
394            return TRACKED_RESPONSE_MARKER;
395        }
396    
397        public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
398            if (id != null) {
399                SessionId sessionId = id.getParentId();
400                if (sessionId != null) {
401                    ConnectionId connectionId = sessionId.getParentId();
402                    if (connectionId != null) {
403                        ConnectionState cs = connectionStates.get(connectionId);
404                        if (cs != null) {
405                            SessionState ss = cs.getSessionState(sessionId);
406                            if (ss != null) {
407                                ss.removeConsumer(id);
408                            }
409                        }
410                    }
411                }
412            }
413            return TRACKED_RESPONSE_MARKER;
414        }
415    
416        public Response processAddSession(SessionInfo info) {
417            if (info != null) {
418                ConnectionId connectionId = info.getSessionId().getParentId();
419                if (connectionId != null) {
420                    ConnectionState cs = connectionStates.get(connectionId);
421                    if (cs != null) {
422                        cs.addSession(info);
423                    }
424                }
425            }
426            return TRACKED_RESPONSE_MARKER;
427        }
428    
429        public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
430            if (id != null) {
431                ConnectionId connectionId = id.getParentId();
432                if (connectionId != null) {
433                    ConnectionState cs = connectionStates.get(connectionId);
434                    if (cs != null) {
435                        cs.removeSession(id);
436                    }
437                }
438            }
439            return TRACKED_RESPONSE_MARKER;
440        }
441    
442        public Response processAddConnection(ConnectionInfo info) {
443            if (info != null) {
444                connectionStates.put(info.getConnectionId(), new ConnectionState(info));
445            }
446            return TRACKED_RESPONSE_MARKER;
447        }
448    
449        public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
450            if (id != null) {
451                connectionStates.remove(id);
452            }
453            return TRACKED_RESPONSE_MARKER;
454        }
455    
456        public Response processMessage(Message send) throws Exception {
457            if (send != null) {
458                if (trackTransactions && send.getTransactionId() != null) {
459                    ProducerId producerId = send.getProducerId();
460                    ConnectionId connectionId = producerId.getParentId().getParentId();
461                    if (connectionId != null) {
462                        ConnectionState cs = connectionStates.get(connectionId);
463                        if (cs != null) {
464                            TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
465                            if (transactionState != null) {
466                                transactionState.addCommand(send);
467                                
468                                if (trackTransactionProducers) {
469                                    // for jmstemplate, track the producer in case it is closed before commit
470                                    // and needs to be replayed
471                                    SessionState ss = cs.getSessionState(producerId.getParentId());
472                                    ProducerState producerState = ss.getProducerState(producerId);
473                                    producerState.setTransactionState(transactionState);            
474                                }
475                            }
476                        }
477                    }
478                    return TRACKED_RESPONSE_MARKER;
479                }else if (trackMessages) {
480                    messageCache.put(send.getMessageId(), send);
481                }
482            }
483            return null;
484        }
485    
486        public Response processBeginTransaction(TransactionInfo info) {
487            if (trackTransactions && info != null && info.getTransactionId() != null) {
488                ConnectionId connectionId = info.getConnectionId();
489                if (connectionId != null) {
490                    ConnectionState cs = connectionStates.get(connectionId);
491                    if (cs != null) {
492                        cs.addTransactionState(info.getTransactionId());
493                        TransactionState state = cs.getTransactionState(info.getTransactionId());
494                        state.addCommand(info);
495                    }
496                }
497                return TRACKED_RESPONSE_MARKER;
498            }
499            return null;
500        }
501    
502        public Response processPrepareTransaction(TransactionInfo info) throws Exception {
503            if (trackTransactions && info != null) {
504                ConnectionId connectionId = info.getConnectionId();
505                if (connectionId != null) {
506                    ConnectionState cs = connectionStates.get(connectionId);
507                    if (cs != null) {
508                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
509                        if (transactionState != null) {
510                            transactionState.addCommand(info);
511                            return new Tracked(new PrepareReadonlyTransactionAction(info));
512                        }
513                    }
514                }
515            }
516            return null;
517        }
518    
519        public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
520            if (trackTransactions && info != null) {
521                ConnectionId connectionId = info.getConnectionId();
522                if (connectionId != null) {
523                    ConnectionState cs = connectionStates.get(connectionId);
524                    if (cs != null) {
525                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
526                        if (transactionState != null) {
527                            transactionState.addCommand(info);
528                            return new Tracked(new RemoveTransactionAction(info));
529                        }
530                    }
531                }
532            }
533            return null;
534        }
535    
536        public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
537            if (trackTransactions && info != null) {
538                ConnectionId connectionId = info.getConnectionId();
539                if (connectionId != null) {
540                    ConnectionState cs = connectionStates.get(connectionId);
541                    if (cs != null) {
542                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
543                        if (transactionState != null) {
544                            transactionState.addCommand(info);
545                            return new Tracked(new RemoveTransactionAction(info));
546                        }
547                    }
548                }
549            }
550            return null;
551        }
552    
553        public Response processRollbackTransaction(TransactionInfo info) throws Exception {
554            if (trackTransactions && info != null) {
555                ConnectionId connectionId = info.getConnectionId();
556                if (connectionId != null) {
557                    ConnectionState cs = connectionStates.get(connectionId);
558                    if (cs != null) {
559                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
560                        if (transactionState != null) {
561                            transactionState.addCommand(info);
562                            return new Tracked(new RemoveTransactionAction(info));
563                        }
564                    }
565                }
566            }
567            return null;
568        }
569    
570        public Response processEndTransaction(TransactionInfo info) throws Exception {
571            if (trackTransactions && info != null) {
572                ConnectionId connectionId = info.getConnectionId();
573                if (connectionId != null) {
574                    ConnectionState cs = connectionStates.get(connectionId);
575                    if (cs != null) {
576                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
577                        if (transactionState != null) {
578                            transactionState.addCommand(info);
579                        }
580                    }
581                }
582                return TRACKED_RESPONSE_MARKER;
583            }
584            return null;
585        }
586    
587        @Override
588        public Response processMessagePull(MessagePull pull) throws Exception {
589            if (pull != null) {
590                // leave a single instance in the cache
591                final String id = pull.getDestination() + "::" + pull.getConsumerId();
592                messageCache.put(id.intern(), pull);
593            }
594            return null;
595        }
596    
597        public boolean isRestoreConsumers() {
598            return restoreConsumers;
599        }
600    
601        public void setRestoreConsumers(boolean restoreConsumers) {
602            this.restoreConsumers = restoreConsumers;
603        }
604    
605        public boolean isRestoreProducers() {
606            return restoreProducers;
607        }
608    
609        public void setRestoreProducers(boolean restoreProducers) {
610            this.restoreProducers = restoreProducers;
611        }
612    
613        public boolean isRestoreSessions() {
614            return restoreSessions;
615        }
616    
617        public void setRestoreSessions(boolean restoreSessions) {
618            this.restoreSessions = restoreSessions;
619        }
620    
621        public boolean isTrackTransactions() {
622            return trackTransactions;
623        }
624    
625        public void setTrackTransactions(boolean trackTransactions) {
626            this.trackTransactions = trackTransactions;
627        }
628        
629        public boolean isTrackTransactionProducers() {
630            return this.trackTransactionProducers;
631        }
632    
633        public void setTrackTransactionProducers(boolean trackTransactionProducers) {
634            this.trackTransactionProducers = trackTransactionProducers;
635        }
636        
637        public boolean isRestoreTransaction() {
638            return restoreTransaction;
639        }
640    
641        public void setRestoreTransaction(boolean restoreTransaction) {
642            this.restoreTransaction = restoreTransaction;
643        }
644    
645        public boolean isTrackMessages() {
646            return trackMessages;
647        }
648    
649        public void setTrackMessages(boolean trackMessages) {
650            this.trackMessages = trackMessages;
651        }
652    
653        public int getMaxCacheSize() {
654            return maxCacheSize;
655        }
656    
657        public void setMaxCacheSize(int maxCacheSize) {
658            this.maxCacheSize = maxCacheSize;
659        }
660    
661        public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
662            ConnectionState connectionState = connectionStates.get(connectionId);
663            if (connectionState != null) {
664                connectionState.setConnectionInterruptProcessingComplete(true);
665                Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers();
666                for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) {
667                    ConsumerControl control = new ConsumerControl();
668                    control.setConsumerId(entry.getKey());
669                    control.setPrefetch(entry.getValue().getPrefetchSize());
670                    control.setDestination(entry.getValue().getDestination());
671                    try {
672                        if (LOG.isDebugEnabled()) {
673                            LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
674                        }
675                        transport.oneway(control);  
676                    } catch (Exception ex) {
677                        if (LOG.isDebugEnabled()) {
678                            LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()
679                                    + " with: " + control.getPrefetch(), ex);
680                        }
681                    }
682                }
683                stalledConsumers.clear();
684            }
685        }
686    
687        public void transportInterrupted(ConnectionId connectionId) {
688            ConnectionState connectionState = connectionStates.get(connectionId);
689            if (connectionState != null) {
690                connectionState.setConnectionInterruptProcessingComplete(false);
691            }
692        }
693    }