001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.state;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.LinkedHashMap;
022    import java.util.Map;
023    import java.util.Vector;
024    import java.util.Map.Entry;
025    import java.util.concurrent.ConcurrentHashMap;
026    
027    import javax.jms.TransactionRolledBackException;
028    import javax.transaction.xa.XAResource;
029    
030    import org.apache.activemq.command.Command;
031    import org.apache.activemq.command.ConnectionId;
032    import org.apache.activemq.command.ConnectionInfo;
033    import org.apache.activemq.command.ConsumerControl;
034    import org.apache.activemq.command.ConsumerId;
035    import org.apache.activemq.command.ConsumerInfo;
036    import org.apache.activemq.command.DestinationInfo;
037    import org.apache.activemq.command.ExceptionResponse;
038    import org.apache.activemq.command.IntegerResponse;
039    import org.apache.activemq.command.Message;
040    import org.apache.activemq.command.MessagePull;
041    import org.apache.activemq.command.ProducerId;
042    import org.apache.activemq.command.ProducerInfo;
043    import org.apache.activemq.command.Response;
044    import org.apache.activemq.command.SessionId;
045    import org.apache.activemq.command.SessionInfo;
046    import org.apache.activemq.command.TransactionInfo;
047    import org.apache.activemq.transport.Transport;
048    import org.apache.activemq.util.IOExceptionSupport;
049    import org.slf4j.Logger;
050    import org.slf4j.LoggerFactory;
051    
052    /**
053     * Tracks the state of a connection so a newly established transport can be
054     * re-initialized to the state that was tracked.
055     * 
056     * 
057     */
058    public class ConnectionStateTracker extends CommandVisitorAdapter {
059        private static final Logger LOG = LoggerFactory.getLogger(ConnectionStateTracker.class);
060    
061        private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
062        private static final int MESSAGE_PULL_SIZE = 400;
063        protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>(); 
064    
065        private boolean trackTransactions;
066        private boolean restoreSessions = true;
067        private boolean restoreConsumers = true;
068        private boolean restoreProducers = true;
069        private boolean restoreTransaction = true;
070        private boolean trackMessages = true;
071        private boolean trackTransactionProducers = true;
072        private int maxCacheSize = 128 * 1024;
073        private int currentCacheSize;
074        private Map<Object,Command> messageCache = new LinkedHashMap<Object,Command>(){
075            protected boolean removeEldestEntry(Map.Entry<Object,Command> eldest) {
076                boolean result = currentCacheSize > maxCacheSize;
077                if (result) {
078                    if (eldest.getValue() instanceof Message) {
079                        currentCacheSize -= ((Message)eldest.getValue()).getSize();
080                    } else if (eldest.getValue() instanceof MessagePull) {
081                        currentCacheSize -= MESSAGE_PULL_SIZE;
082                    }
083                    if (LOG.isTraceEnabled()) {
084                        LOG.trace("removing tracked message: " + eldest.getKey());
085                    }
086                }
087                return result;
088            }
089        };
090        
091        private class RemoveTransactionAction implements ResponseHandler {
092            private final TransactionInfo info;
093    
094            public RemoveTransactionAction(TransactionInfo info) {
095                this.info = info;
096            }
097    
098            public void onResponse(Command response) {
099                ConnectionId connectionId = info.getConnectionId();
100                ConnectionState cs = connectionStates.get(connectionId);
101                cs.removeTransactionState(info.getTransactionId());
102            }
103        }
104        
105        private class PrepareReadonlyTransactionAction extends RemoveTransactionAction {
106    
107            public PrepareReadonlyTransactionAction(TransactionInfo info) {
108                super(info);
109            }
110    
111            public void onResponse(Command command) {
112                IntegerResponse response = (IntegerResponse) command;
113                if (XAResource.XA_RDONLY == response.getResult()) {
114                    // all done, no commit or rollback from TM
115                    super.onResponse(command);
116                }
117            }
118        }
119    
120        /**
121         * 
122         * 
123         * @param command
124         * @return null if the command is not state tracked.
125         * @throws IOException
126         */
127        public Tracked track(Command command) throws IOException {
128            try {
129                return (Tracked)command.visit(this);
130            } catch (IOException e) {
131                throw e;
132            } catch (Throwable e) {
133                throw IOExceptionSupport.create(e);
134            }
135        }
136        
137        public void trackBack(Command command) {
138            if (command != null) {
139                if (trackMessages && command.isMessage()) {
140                    Message message = (Message) command;
141                    if (message.getTransactionId()==null) {
142                        currentCacheSize = currentCacheSize +  message.getSize();
143                    }
144                } else if (command instanceof MessagePull) {
145                    // just needs to be a rough estimate of size, ~4 identifiers
146                    currentCacheSize += MESSAGE_PULL_SIZE;
147                }
148            }
149        }
150    
151        public void restore(Transport transport) throws IOException {
152            // Restore the connections.
153            for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
154                ConnectionState connectionState = iter.next();
155                connectionState.getInfo().setFailoverReconnect(true);
156                if (LOG.isDebugEnabled()) {
157                    LOG.debug("conn: " + connectionState.getInfo().getConnectionId());
158                }
159                transport.oneway(connectionState.getInfo());
160                restoreTempDestinations(transport, connectionState);
161    
162                if (restoreSessions) {
163                    restoreSessions(transport, connectionState);
164                }
165    
166                if (restoreTransaction) {
167                    restoreTransactions(transport, connectionState);
168                }
169            }
170            //now flush messages
171            for (Command msg:messageCache.values()) {
172                if (LOG.isDebugEnabled()) {
173                    LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId() : msg));
174                }
175                transport.oneway(msg);
176            }
177        }
178    
179        private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
180            Vector<TransactionInfo> toRollback = new Vector<TransactionInfo>();
181            for (TransactionState transactionState : connectionState.getTransactionStates()) {
182                if (LOG.isDebugEnabled()) {
183                    LOG.debug("tx: " + transactionState.getId());
184                }
185                
186                // rollback any completed transactions - no way to know if commit got there
187                // or if reply went missing
188                //
189                if (!transactionState.getCommands().isEmpty()) {
190                    Command lastCommand = transactionState.getCommands().get(transactionState.getCommands().size() - 1);
191                    if (lastCommand instanceof TransactionInfo) {
192                        TransactionInfo transactionInfo = (TransactionInfo) lastCommand;
193                        if (transactionInfo.getType() == TransactionInfo.COMMIT_ONE_PHASE) {
194                            if (LOG.isDebugEnabled()) {
195                                LOG.debug("rolling back potentially completed tx: " + transactionState.getId());
196                            }
197                            toRollback.add(transactionInfo);
198                            continue;
199                        }
200                    }
201                }
202                
203                // replay short lived producers that may have been involved in the transaction
204                for (ProducerState producerState : transactionState.getProducerStates().values()) {
205                    if (LOG.isDebugEnabled()) {
206                        LOG.debug("tx replay producer :" + producerState.getInfo());
207                    }
208                    transport.oneway(producerState.getInfo());
209                }
210                
211                for (Command command : transactionState.getCommands()) {
212                    if (LOG.isDebugEnabled()) {
213                        LOG.debug("tx replay: " + command);
214                    }
215                    transport.oneway(command);
216                }
217                
218                for (ProducerState producerState : transactionState.getProducerStates().values()) {
219                    if (LOG.isDebugEnabled()) {
220                        LOG.debug("tx remove replayed producer :" + producerState.getInfo());
221                    }
222                    transport.oneway(producerState.getInfo().createRemoveCommand());
223                }
224            }
225            
226            for (TransactionInfo command: toRollback) {
227                // respond to the outstanding commit
228                ExceptionResponse response = new ExceptionResponse();
229                response.setException(new TransactionRolledBackException("Transaction completion in doubt due to failover. Forcing rollback of " + command.getTransactionId()));
230                response.setCorrelationId(command.getCommandId());
231                transport.getTransportListener().onCommand(response);
232            }
233        }
234    
235        /**
236         * @param transport
237         * @param connectionState
238         * @throws IOException
239         */
240        protected void restoreSessions(Transport transport, ConnectionState connectionState) throws IOException {
241            // Restore the connection's sessions
242            for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
243                SessionState sessionState = (SessionState)iter2.next();
244                if (LOG.isDebugEnabled()) {
245                    LOG.debug("session: " + sessionState.getInfo().getSessionId());
246                }
247                transport.oneway(sessionState.getInfo());
248    
249                if (restoreProducers) {
250                    restoreProducers(transport, sessionState);
251                }
252    
253                if (restoreConsumers) {
254                    restoreConsumers(transport, sessionState);
255                }
256            }
257        }
258    
259        /**
260         * @param transport
261         * @param sessionState
262         * @throws IOException
263         */
264        protected void restoreConsumers(Transport transport, SessionState sessionState) throws IOException {
265            // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
266            final ConnectionState connectionState = connectionStates.get(sessionState.getInfo().getSessionId().getParentId());
267            final boolean connectionInterruptionProcessingComplete = connectionState.isConnectionInterruptProcessingComplete();
268            for (ConsumerState consumerState : sessionState.getConsumerStates()) {   
269                ConsumerInfo infoToSend = consumerState.getInfo();
270                if (!connectionInterruptionProcessingComplete && infoToSend.getPrefetchSize() > 0) {
271                    infoToSend = consumerState.getInfo().copy();
272                    connectionState.getRecoveringPullConsumers().put(infoToSend.getConsumerId(), consumerState.getInfo());
273                    infoToSend.setPrefetchSize(0);
274                    if (LOG.isDebugEnabled()) {
275                        LOG.debug("restore consumer: " + infoToSend.getConsumerId() + " in pull mode pending recovery, overriding prefetch: " + consumerState.getInfo().getPrefetchSize());
276                    }
277                }
278                if (LOG.isDebugEnabled()) {
279                    LOG.debug("restore consumer: " + infoToSend.getConsumerId());
280                }
281                transport.oneway(infoToSend);
282            }
283        }
284    
285        /**
286         * @param transport
287         * @param sessionState
288         * @throws IOException
289         */
290        protected void restoreProducers(Transport transport, SessionState sessionState) throws IOException {
291            // Restore the session's producers
292            for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
293                ProducerState producerState = (ProducerState)iter3.next();
294                if (LOG.isDebugEnabled()) {
295                    LOG.debug("producer: " + producerState.getInfo().getProducerId());
296                }
297                transport.oneway(producerState.getInfo());
298            }
299        }
300    
301        /**
302         * @param transport
303         * @param connectionState
304         * @throws IOException
305         */
306        protected void restoreTempDestinations(Transport transport, ConnectionState connectionState)
307            throws IOException {
308            // Restore the connection's temp destinations.
309            for (Iterator iter2 = connectionState.getTempDestinations().iterator(); iter2.hasNext();) {
310                DestinationInfo info = (DestinationInfo)iter2.next();
311                transport.oneway(info);
312                if (LOG.isDebugEnabled()) {
313                    LOG.debug("tempDest: " + info.getDestination());
314                }
315            }
316        }
317    
318        public Response processAddDestination(DestinationInfo info) {
319            if (info != null) {
320                ConnectionState cs = connectionStates.get(info.getConnectionId());
321                if (cs != null && info.getDestination().isTemporary()) {
322                    cs.addTempDestination(info);
323                }
324            }
325            return TRACKED_RESPONSE_MARKER;
326        }
327    
328        public Response processRemoveDestination(DestinationInfo info) {
329            if (info != null) {
330                ConnectionState cs = connectionStates.get(info.getConnectionId());
331                if (cs != null && info.getDestination().isTemporary()) {
332                    cs.removeTempDestination(info.getDestination());
333                }
334            }
335            return TRACKED_RESPONSE_MARKER;
336        }
337    
338        public Response processAddProducer(ProducerInfo info) {
339            if (info != null && info.getProducerId() != null) {
340                SessionId sessionId = info.getProducerId().getParentId();
341                if (sessionId != null) {
342                    ConnectionId connectionId = sessionId.getParentId();
343                    if (connectionId != null) {
344                        ConnectionState cs = connectionStates.get(connectionId);
345                        if (cs != null) {
346                            SessionState ss = cs.getSessionState(sessionId);
347                            if (ss != null) {
348                                ss.addProducer(info);
349                            }
350                        }
351                    }
352                }
353            }
354            return TRACKED_RESPONSE_MARKER;
355        }
356    
357        public Response processRemoveProducer(ProducerId id) {
358            if (id != null) {
359                SessionId sessionId = id.getParentId();
360                if (sessionId != null) {
361                    ConnectionId connectionId = sessionId.getParentId();
362                    if (connectionId != null) {
363                        ConnectionState cs = connectionStates.get(connectionId);
364                        if (cs != null) {
365                            SessionState ss = cs.getSessionState(sessionId);
366                            if (ss != null) {
367                                ss.removeProducer(id);
368                            }
369                        }
370                    }
371                }
372            }
373            return TRACKED_RESPONSE_MARKER;
374        }
375    
376        public Response processAddConsumer(ConsumerInfo info) {
377            if (info != null) {
378                SessionId sessionId = info.getConsumerId().getParentId();
379                if (sessionId != null) {
380                    ConnectionId connectionId = sessionId.getParentId();
381                    if (connectionId != null) {
382                        ConnectionState cs = connectionStates.get(connectionId);
383                        if (cs != null) {
384                            SessionState ss = cs.getSessionState(sessionId);
385                            if (ss != null) {
386                                ss.addConsumer(info);
387                            }
388                        }
389                    }
390                }
391            }
392            return TRACKED_RESPONSE_MARKER;
393        }
394    
395        public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) {
396            if (id != null) {
397                SessionId sessionId = id.getParentId();
398                if (sessionId != null) {
399                    ConnectionId connectionId = sessionId.getParentId();
400                    if (connectionId != null) {
401                        ConnectionState cs = connectionStates.get(connectionId);
402                        if (cs != null) {
403                            SessionState ss = cs.getSessionState(sessionId);
404                            if (ss != null) {
405                                ss.removeConsumer(id);
406                            }
407                        }
408                    }
409                }
410            }
411            return TRACKED_RESPONSE_MARKER;
412        }
413    
414        public Response processAddSession(SessionInfo info) {
415            if (info != null) {
416                ConnectionId connectionId = info.getSessionId().getParentId();
417                if (connectionId != null) {
418                    ConnectionState cs = connectionStates.get(connectionId);
419                    if (cs != null) {
420                        cs.addSession(info);
421                    }
422                }
423            }
424            return TRACKED_RESPONSE_MARKER;
425        }
426    
427        public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) {
428            if (id != null) {
429                ConnectionId connectionId = id.getParentId();
430                if (connectionId != null) {
431                    ConnectionState cs = connectionStates.get(connectionId);
432                    if (cs != null) {
433                        cs.removeSession(id);
434                    }
435                }
436            }
437            return TRACKED_RESPONSE_MARKER;
438        }
439    
440        public Response processAddConnection(ConnectionInfo info) {
441            if (info != null) {
442                connectionStates.put(info.getConnectionId(), new ConnectionState(info));
443            }
444            return TRACKED_RESPONSE_MARKER;
445        }
446    
447        public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
448            if (id != null) {
449                connectionStates.remove(id);
450            }
451            return TRACKED_RESPONSE_MARKER;
452        }
453    
454        public Response processMessage(Message send) throws Exception {
455            if (send != null) {
456                if (trackTransactions && send.getTransactionId() != null) {
457                    ProducerId producerId = send.getProducerId();
458                    ConnectionId connectionId = producerId.getParentId().getParentId();
459                    if (connectionId != null) {
460                        ConnectionState cs = connectionStates.get(connectionId);
461                        if (cs != null) {
462                            TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
463                            if (transactionState != null) {
464                                transactionState.addCommand(send);
465                                
466                                if (trackTransactionProducers) {
467                                    // for jmstemplate, track the producer in case it is closed before commit
468                                    // and needs to be replayed
469                                    SessionState ss = cs.getSessionState(producerId.getParentId());
470                                    ProducerState producerState = ss.getProducerState(producerId);
471                                    producerState.setTransactionState(transactionState);            
472                                }
473                            }
474                        }
475                    }
476                    return TRACKED_RESPONSE_MARKER;
477                }else if (trackMessages) {
478                    messageCache.put(send.getMessageId(), send);
479                }
480            }
481            return null;
482        }
483    
484        public Response processBeginTransaction(TransactionInfo info) {
485            if (trackTransactions && info != null && info.getTransactionId() != null) {
486                ConnectionId connectionId = info.getConnectionId();
487                if (connectionId != null) {
488                    ConnectionState cs = connectionStates.get(connectionId);
489                    if (cs != null) {
490                        cs.addTransactionState(info.getTransactionId());
491                        TransactionState state = cs.getTransactionState(info.getTransactionId());
492                        state.addCommand(info);
493                    }
494                }
495                return TRACKED_RESPONSE_MARKER;
496            }
497            return null;
498        }
499    
500        public Response processPrepareTransaction(TransactionInfo info) throws Exception {
501            if (trackTransactions && info != null) {
502                ConnectionId connectionId = info.getConnectionId();
503                if (connectionId != null) {
504                    ConnectionState cs = connectionStates.get(connectionId);
505                    if (cs != null) {
506                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
507                        if (transactionState != null) {
508                            transactionState.addCommand(info);
509                            return new Tracked(new PrepareReadonlyTransactionAction(info));
510                        }
511                    }
512                }
513            }
514            return null;
515        }
516    
517        public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
518            if (trackTransactions && info != null) {
519                ConnectionId connectionId = info.getConnectionId();
520                if (connectionId != null) {
521                    ConnectionState cs = connectionStates.get(connectionId);
522                    if (cs != null) {
523                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
524                        if (transactionState != null) {
525                            transactionState.addCommand(info);
526                            return new Tracked(new RemoveTransactionAction(info));
527                        }
528                    }
529                }
530            }
531            return null;
532        }
533    
534        public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
535            if (trackTransactions && info != null) {
536                ConnectionId connectionId = info.getConnectionId();
537                if (connectionId != null) {
538                    ConnectionState cs = connectionStates.get(connectionId);
539                    if (cs != null) {
540                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
541                        if (transactionState != null) {
542                            transactionState.addCommand(info);
543                            return new Tracked(new RemoveTransactionAction(info));
544                        }
545                    }
546                }
547            }
548            return null;
549        }
550    
551        public Response processRollbackTransaction(TransactionInfo info) throws Exception {
552            if (trackTransactions && info != null) {
553                ConnectionId connectionId = info.getConnectionId();
554                if (connectionId != null) {
555                    ConnectionState cs = connectionStates.get(connectionId);
556                    if (cs != null) {
557                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
558                        if (transactionState != null) {
559                            transactionState.addCommand(info);
560                            return new Tracked(new RemoveTransactionAction(info));
561                        }
562                    }
563                }
564            }
565            return null;
566        }
567    
568        public Response processEndTransaction(TransactionInfo info) throws Exception {
569            if (trackTransactions && info != null) {
570                ConnectionId connectionId = info.getConnectionId();
571                if (connectionId != null) {
572                    ConnectionState cs = connectionStates.get(connectionId);
573                    if (cs != null) {
574                        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
575                        if (transactionState != null) {
576                            transactionState.addCommand(info);
577                        }
578                    }
579                }
580                return TRACKED_RESPONSE_MARKER;
581            }
582            return null;
583        }
584    
585        @Override
586        public Response processMessagePull(MessagePull pull) throws Exception {
587            if (pull != null) {
588                // leave a single instance in the cache
589                final String id = pull.getDestination() + "::" + pull.getConsumerId();
590                messageCache.put(id.intern(), pull);
591            }
592            return null;
593        }
594    
595        public boolean isRestoreConsumers() {
596            return restoreConsumers;
597        }
598    
599        public void setRestoreConsumers(boolean restoreConsumers) {
600            this.restoreConsumers = restoreConsumers;
601        }
602    
603        public boolean isRestoreProducers() {
604            return restoreProducers;
605        }
606    
607        public void setRestoreProducers(boolean restoreProducers) {
608            this.restoreProducers = restoreProducers;
609        }
610    
611        public boolean isRestoreSessions() {
612            return restoreSessions;
613        }
614    
615        public void setRestoreSessions(boolean restoreSessions) {
616            this.restoreSessions = restoreSessions;
617        }
618    
619        public boolean isTrackTransactions() {
620            return trackTransactions;
621        }
622    
623        public void setTrackTransactions(boolean trackTransactions) {
624            this.trackTransactions = trackTransactions;
625        }
626        
627        public boolean isTrackTransactionProducers() {
628            return this.trackTransactionProducers;
629        }
630    
631        public void setTrackTransactionProducers(boolean trackTransactionProducers) {
632            this.trackTransactionProducers = trackTransactionProducers;
633        }
634        
635        public boolean isRestoreTransaction() {
636            return restoreTransaction;
637        }
638    
639        public void setRestoreTransaction(boolean restoreTransaction) {
640            this.restoreTransaction = restoreTransaction;
641        }
642    
643        public boolean isTrackMessages() {
644            return trackMessages;
645        }
646    
647        public void setTrackMessages(boolean trackMessages) {
648            this.trackMessages = trackMessages;
649        }
650    
651        public int getMaxCacheSize() {
652            return maxCacheSize;
653        }
654    
655        public void setMaxCacheSize(int maxCacheSize) {
656            this.maxCacheSize = maxCacheSize;
657        }
658    
659        public void connectionInterruptProcessingComplete(Transport transport, ConnectionId connectionId) {
660            ConnectionState connectionState = connectionStates.get(connectionId);
661            if (connectionState != null) {
662                connectionState.setConnectionInterruptProcessingComplete(true);
663                Map<ConsumerId, ConsumerInfo> stalledConsumers = connectionState.getRecoveringPullConsumers();
664                for (Entry<ConsumerId, ConsumerInfo> entry: stalledConsumers.entrySet()) {
665                    ConsumerControl control = new ConsumerControl();
666                    control.setConsumerId(entry.getKey());
667                    control.setPrefetch(entry.getValue().getPrefetchSize());
668                    control.setDestination(entry.getValue().getDestination());
669                    try {
670                        if (LOG.isDebugEnabled()) {
671                            LOG.debug("restored recovering consumer: " + control.getConsumerId() + " with: " + control.getPrefetch());
672                        }
673                        transport.oneway(control);  
674                    } catch (Exception ex) {
675                        if (LOG.isDebugEnabled()) {
676                            LOG.debug("Failed to submit control for consumer: " + control.getConsumerId()
677                                    + " with: " + control.getPrefetch(), ex);
678                        }
679                    }
680                }
681                stalledConsumers.clear();
682            }
683        }
684    
685        public void transportInterrupted(ConnectionId connectionId) {
686            ConnectionState connectionState = connectionStates.get(connectionId);
687            if (connectionState != null) {
688                connectionState.setConnectionInterruptProcessingComplete(false);
689            }
690        }
691    }