001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.EOFException;
020    import java.io.IOException;
021    import java.net.SocketException;
022    import java.net.URI;
023    import java.util.HashMap;
024    import java.util.Iterator;
025    import java.util.LinkedList;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Properties;
029    import java.util.concurrent.ConcurrentHashMap;
030    import java.util.concurrent.CopyOnWriteArrayList;
031    import java.util.concurrent.CountDownLatch;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicBoolean;
034    import java.util.concurrent.atomic.AtomicInteger;
035    import java.util.concurrent.atomic.AtomicReference;
036    import java.util.concurrent.locks.ReentrantReadWriteLock;
037    
038    import javax.transaction.xa.XAResource;
039    import org.apache.activemq.advisory.AdvisorySupport;
040    import org.apache.activemq.broker.ft.MasterBroker;
041    import org.apache.activemq.broker.region.ConnectionStatistics;
042    import org.apache.activemq.broker.region.RegionBroker;
043    import org.apache.activemq.command.*;
044    import org.apache.activemq.network.DemandForwardingBridge;
045    import org.apache.activemq.network.MBeanNetworkListener;
046    import org.apache.activemq.network.NetworkBridgeConfiguration;
047    import org.apache.activemq.network.NetworkBridgeFactory;
048    import org.apache.activemq.security.MessageAuthorizationPolicy;
049    import org.apache.activemq.state.CommandVisitor;
050    import org.apache.activemq.state.ConnectionState;
051    import org.apache.activemq.state.ConsumerState;
052    import org.apache.activemq.state.ProducerState;
053    import org.apache.activemq.state.SessionState;
054    import org.apache.activemq.state.TransactionState;
055    import org.apache.activemq.thread.Task;
056    import org.apache.activemq.thread.TaskRunner;
057    import org.apache.activemq.thread.TaskRunnerFactory;
058    import org.apache.activemq.transaction.Transaction;
059    import org.apache.activemq.transport.DefaultTransportListener;
060    import org.apache.activemq.transport.ResponseCorrelator;
061    import org.apache.activemq.transport.Transport;
062    import org.apache.activemq.transport.TransportDisposedIOException;
063    import org.apache.activemq.transport.TransportFactory;
064    import org.apache.activemq.util.IntrospectionSupport;
065    import org.apache.activemq.util.MarshallingSupport;
066    import org.apache.activemq.util.ServiceSupport;
067    import org.apache.activemq.util.URISupport;
068    import org.slf4j.Logger;
069    import org.slf4j.LoggerFactory;
070    import org.slf4j.MDC;
071    
072    public class TransportConnection implements Connection, Task, CommandVisitor {
073        private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
074        private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
075        private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
076        // Keeps track of the broker and connector that created this connection.
077        protected final Broker broker;
078        protected final TransportConnector connector;
079        // Keeps track of the state of the connections.
080        // protected final ConcurrentHashMap localConnectionStates=new
081        // ConcurrentHashMap();
082        protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
083        // The broker and wireformat info that was exchanged.
084        protected BrokerInfo brokerInfo;
085        protected final List<Command> dispatchQueue = new LinkedList<Command>();
086        protected TaskRunner taskRunner;
087        protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
088        protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
089        private MasterBroker masterBroker;
090        private final Transport transport;
091        private MessageAuthorizationPolicy messageAuthorizationPolicy;
092        private WireFormatInfo wireFormatInfo;
093        // Used to do async dispatch.. this should perhaps be pushed down into the
094        // transport layer..
095        private boolean inServiceException;
096        private final ConnectionStatistics statistics = new ConnectionStatistics();
097        private boolean manageable;
098        private boolean slow;
099        private boolean markedCandidate;
100        private boolean blockedCandidate;
101        private boolean blocked;
102        private boolean connected;
103        private boolean active;
104        private boolean starting;
105        private boolean pendingStop;
106        private long timeStamp;
107        private final AtomicBoolean stopping = new AtomicBoolean(false);
108        private final CountDownLatch stopped = new CountDownLatch(1);
109        private final AtomicBoolean asyncException = new AtomicBoolean(false);
110        private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
111        private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
112        private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
113        private ConnectionContext context;
114        private boolean networkConnection;
115        private boolean faultTolerantConnection;
116        private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
117        private DemandForwardingBridge duplexBridge;
118        private final TaskRunnerFactory taskRunnerFactory;
119        private final TaskRunnerFactory stopTaskRunnerFactory;
120        private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
121        private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
122        private String duplexNetworkConnectorId;
123        private Throwable stopError = null;
124    
125        /**
126         * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
127         *                          else commands are sent async.
128         * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection.
129         */
130        public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
131                                   TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
132            this.connector = connector;
133            this.broker = broker;
134            RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
135            brokerConnectionStates = rb.getConnectionStates();
136            if (connector != null) {
137                this.statistics.setParent(connector.getStatistics());
138                this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
139            }
140            this.taskRunnerFactory = taskRunnerFactory;
141            this.stopTaskRunnerFactory = stopTaskRunnerFactory;
142            this.transport = transport;
143            this.transport.setTransportListener(new DefaultTransportListener() {
144                @Override
145                public void onCommand(Object o) {
146                    serviceLock.readLock().lock();
147                    try {
148                        if (!(o instanceof Command)) {
149                            throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
150                        }
151                        Command command = (Command) o;
152                        Response response = service(command);
153                        if (response != null) {
154                            dispatchSync(response);
155                        }
156                    } finally {
157                        serviceLock.readLock().unlock();
158                    }
159                }
160    
161                @Override
162                public void onException(IOException exception) {
163                    serviceLock.readLock().lock();
164                    try {
165                        serviceTransportException(exception);
166                    } finally {
167                        serviceLock.readLock().unlock();
168                    }
169                }
170            });
171            connected = true;
172        }
173    
174        /**
175         * Returns the number of messages to be dispatched to this connection
176         *
177         * @return size of dispatch queue
178         */
179        public int getDispatchQueueSize() {
180            synchronized (dispatchQueue) {
181                return dispatchQueue.size();
182            }
183        }
184    
185        public void serviceTransportException(IOException e) {
186            BrokerService bService = connector.getBrokerService();
187            if (bService.isShutdownOnSlaveFailure()) {
188                if (brokerInfo != null) {
189                    if (brokerInfo.isSlaveBroker()) {
190                        LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e);
191                        try {
192                            doStop();
193                            bService.stop();
194                        } catch (Exception ex) {
195                            LOG.warn("Failed to stop the master", ex);
196                        }
197                    }
198                }
199            }
200            if (!stopping.get()) {
201                transportException.set(e);
202                if (TRANSPORTLOG.isDebugEnabled()) {
203                    TRANSPORTLOG.debug(this + " failed: " + e, e);
204                } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
205                    TRANSPORTLOG.warn(this + " failed: " + e);
206                }
207                stopAsync();
208            }
209        }
210    
211        private boolean expected(IOException e) {
212            return isStomp() &&
213                    ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
214        }
215    
216        private boolean isStomp() {
217            URI uri = connector.getUri();
218            return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
219        }
220    
221        /**
222         * Calls the serviceException method in an async thread. Since handling a
223         * service exception closes a socket, we should not tie up broker threads
224         * since client sockets may hang or cause deadlocks.
225         */
226        public void serviceExceptionAsync(final IOException e) {
227            if (asyncException.compareAndSet(false, true)) {
228                new Thread("Async Exception Handler") {
229                    @Override
230                    public void run() {
231                        serviceException(e);
232                    }
233                }.start();
234            }
235        }
236    
237        /**
238         * Closes a clients connection due to a detected error. Errors are ignored
239         * if: the client is closing or broker is closing. Otherwise, the connection
240         * error transmitted to the client before stopping it's transport.
241         */
242        public void serviceException(Throwable e) {
243            // are we a transport exception such as not being able to dispatch
244            // synchronously to a transport
245            if (e instanceof IOException) {
246                serviceTransportException((IOException) e);
247            } else if (e.getClass() == BrokerStoppedException.class) {
248                // Handle the case where the broker is stopped
249                // But the client is still connected.
250                if (!stopping.get()) {
251                    if (SERVICELOG.isDebugEnabled()) {
252                        SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
253                    }
254                    ConnectionError ce = new ConnectionError();
255                    ce.setException(e);
256                    dispatchSync(ce);
257                    // Record the error that caused the transport to stop
258                    this.stopError = e;
259                    // Wait a little bit to try to get the output buffer to flush
260                    // the exption notification to the client.
261                    try {
262                        Thread.sleep(500);
263                    } catch (InterruptedException ie) {
264                        Thread.currentThread().interrupt();
265                    }
266                    // Worst case is we just kill the connection before the
267                    // notification gets to him.
268                    stopAsync();
269                }
270            } else if (!stopping.get() && !inServiceException) {
271                inServiceException = true;
272                try {
273                    SERVICELOG.warn("Async error occurred: " + e, e);
274                    ConnectionError ce = new ConnectionError();
275                    ce.setException(e);
276                    if (pendingStop) {
277                        dispatchSync(ce);
278                    } else {
279                        dispatchAsync(ce);
280                    }
281                } finally {
282                    inServiceException = false;
283                }
284            }
285        }
286    
287        public Response service(Command command) {
288            MDC.put("activemq.connector", connector.getUri().toString());
289            Response response = null;
290            boolean responseRequired = command.isResponseRequired();
291            int commandId = command.getCommandId();
292            try {
293                if (!pendingStop) {
294                    response = command.visit(this);
295                } else {
296                    response = new ExceptionResponse(this.stopError);
297                }
298            } catch (Throwable e) {
299                if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
300                    SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
301                            + " command: " + command + ", exception: " + e, e);
302                }
303    
304                if (responseRequired) {
305                    response = new ExceptionResponse(e);
306                } else {
307                    serviceException(e);
308                }
309            }
310            if (responseRequired) {
311                if (response == null) {
312                    response = new Response();
313                }
314                response.setCorrelationId(commandId);
315            }
316            // The context may have been flagged so that the response is not
317            // sent.
318            if (context != null) {
319                if (context.isDontSendReponse()) {
320                    context.setDontSendReponse(false);
321                    response = null;
322                }
323                context = null;
324            }
325            MDC.remove("activemq.connector");
326            return response;
327        }
328    
329        public Response processKeepAlive(KeepAliveInfo info) throws Exception {
330            return null;
331        }
332    
333        public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
334            broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
335            return null;
336        }
337    
338        public Response processWireFormat(WireFormatInfo info) throws Exception {
339            wireFormatInfo = info;
340            protocolVersion.set(info.getVersion());
341            return null;
342        }
343    
344        public Response processShutdown(ShutdownInfo info) throws Exception {
345            stopAsync();
346            return null;
347        }
348    
349        public Response processFlush(FlushCommand command) throws Exception {
350            return null;
351        }
352    
353        public Response processBeginTransaction(TransactionInfo info) throws Exception {
354            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
355            context = null;
356            if (cs != null) {
357                context = cs.getContext();
358            }
359            if (cs == null) {
360                throw new NullPointerException("Context is null");
361            }
362            // Avoid replaying dup commands
363            if (cs.getTransactionState(info.getTransactionId()) == null) {
364                cs.addTransactionState(info.getTransactionId());
365                broker.beginTransaction(context, info.getTransactionId());
366            }
367            return null;
368        }
369    
370        public Response processEndTransaction(TransactionInfo info) throws Exception {
371            // No need to do anything. This packet is just sent by the client
372            // make sure he is synced with the server as commit command could
373            // come from a different connection.
374            return null;
375        }
376    
377        public Response processPrepareTransaction(TransactionInfo info) throws Exception {
378            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
379            context = null;
380            if (cs != null) {
381                context = cs.getContext();
382            }
383            if (cs == null) {
384                throw new NullPointerException("Context is null");
385            }
386            TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
387            if (transactionState == null) {
388                throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
389                        + info.getTransactionId());
390            }
391            // Avoid dups.
392            if (!transactionState.isPrepared()) {
393                transactionState.setPrepared(true);
394                int result = broker.prepareTransaction(context, info.getTransactionId());
395                transactionState.setPreparedResult(result);
396                if (result == XAResource.XA_RDONLY) {
397                    // we are done, no further rollback or commit from TM
398                    cs.removeTransactionState(info.getTransactionId());
399                }
400                IntegerResponse response = new IntegerResponse(result);
401                return response;
402            } else {
403                IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
404                return response;
405            }
406        }
407    
408        public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
409            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
410            context = cs.getContext();
411            cs.removeTransactionState(info.getTransactionId());
412            broker.commitTransaction(context, info.getTransactionId(), true);
413            return null;
414        }
415    
416        public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
417            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
418            context = cs.getContext();
419            cs.removeTransactionState(info.getTransactionId());
420            broker.commitTransaction(context, info.getTransactionId(), false);
421            return null;
422        }
423    
424        public Response processRollbackTransaction(TransactionInfo info) throws Exception {
425            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
426            context = cs.getContext();
427            cs.removeTransactionState(info.getTransactionId());
428            broker.rollbackTransaction(context, info.getTransactionId());
429            return null;
430        }
431    
432        public Response processForgetTransaction(TransactionInfo info) throws Exception {
433            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
434            context = cs.getContext();
435            broker.forgetTransaction(context, info.getTransactionId());
436            return null;
437        }
438    
439        public Response processRecoverTransactions(TransactionInfo info) throws Exception {
440            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
441            context = cs.getContext();
442            TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
443            return new DataArrayResponse(preparedTransactions);
444        }
445    
446        public Response processMessage(Message messageSend) throws Exception {
447            ProducerId producerId = messageSend.getProducerId();
448            ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
449            if (producerExchange.canDispatch(messageSend)) {
450                broker.send(producerExchange, messageSend);
451            }
452            return null;
453        }
454    
455        public Response processMessageAck(MessageAck ack) throws Exception {
456            ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
457            if (consumerExchange != null) {
458                broker.acknowledge(consumerExchange, ack);
459            }
460            return null;
461        }
462    
463        public Response processMessagePull(MessagePull pull) throws Exception {
464            return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
465        }
466    
467        public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
468            broker.processDispatchNotification(notification);
469            return null;
470        }
471    
472        public Response processAddDestination(DestinationInfo info) throws Exception {
473            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
474            broker.addDestinationInfo(cs.getContext(), info);
475            if (info.getDestination().isTemporary()) {
476                cs.addTempDestination(info);
477            }
478            return null;
479        }
480    
481        public Response processRemoveDestination(DestinationInfo info) throws Exception {
482            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
483            broker.removeDestinationInfo(cs.getContext(), info);
484            if (info.getDestination().isTemporary()) {
485                cs.removeTempDestination(info.getDestination());
486            }
487            return null;
488        }
489    
490        public Response processAddProducer(ProducerInfo info) throws Exception {
491            SessionId sessionId = info.getProducerId().getParentId();
492            ConnectionId connectionId = sessionId.getParentId();
493            TransportConnectionState cs = lookupConnectionState(connectionId);
494            if (cs == null) {
495                throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
496                        + connectionId);
497            }
498            SessionState ss = cs.getSessionState(sessionId);
499            if (ss == null) {
500                throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
501                        + sessionId);
502            }
503            // Avoid replaying dup commands
504            if (!ss.getProducerIds().contains(info.getProducerId())) {
505                ActiveMQDestination destination = info.getDestination();
506                if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
507                    if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
508                        throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
509                    }
510                }
511                broker.addProducer(cs.getContext(), info);
512                try {
513                    ss.addProducer(info);
514                } catch (IllegalStateException e) {
515                    broker.removeProducer(cs.getContext(), info);
516                }
517    
518            }
519            return null;
520        }
521    
522        public Response processRemoveProducer(ProducerId id) throws Exception {
523            SessionId sessionId = id.getParentId();
524            ConnectionId connectionId = sessionId.getParentId();
525            TransportConnectionState cs = lookupConnectionState(connectionId);
526            SessionState ss = cs.getSessionState(sessionId);
527            if (ss == null) {
528                throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
529                        + sessionId);
530            }
531            ProducerState ps = ss.removeProducer(id);
532            if (ps == null) {
533                throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
534            }
535            removeProducerBrokerExchange(id);
536            broker.removeProducer(cs.getContext(), ps.getInfo());
537            return null;
538        }
539    
540        public Response processAddConsumer(ConsumerInfo info) throws Exception {
541            SessionId sessionId = info.getConsumerId().getParentId();
542            ConnectionId connectionId = sessionId.getParentId();
543            TransportConnectionState cs = lookupConnectionState(connectionId);
544            if (cs == null) {
545                throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: "
546                        + connectionId);
547            }
548            SessionState ss = cs.getSessionState(sessionId);
549            if (ss == null) {
550                throw new IllegalStateException(broker.getBrokerName()
551                        + " Cannot add a consumer to a session that had not been registered: " + sessionId);
552            }
553            // Avoid replaying dup commands
554            if (!ss.getConsumerIds().contains(info.getConsumerId())) {
555                ActiveMQDestination destination = info.getDestination();
556                if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
557                    if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
558                        throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
559                    }
560                }
561    
562                broker.addConsumer(cs.getContext(), info);
563                try {
564                    ss.addConsumer(info);
565                    addConsumerBrokerExchange(info.getConsumerId());
566                } catch (IllegalStateException e) {
567                    broker.removeConsumer(cs.getContext(), info);
568                }
569    
570            }
571            return null;
572        }
573    
574        public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
575            SessionId sessionId = id.getParentId();
576            ConnectionId connectionId = sessionId.getParentId();
577            TransportConnectionState cs = lookupConnectionState(connectionId);
578            if (cs == null) {
579                throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
580                        + connectionId);
581            }
582            SessionState ss = cs.getSessionState(sessionId);
583            if (ss == null) {
584                throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
585                        + sessionId);
586            }
587            ConsumerState consumerState = ss.removeConsumer(id);
588            if (consumerState == null) {
589                throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
590            }
591            ConsumerInfo info = consumerState.getInfo();
592            info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
593            broker.removeConsumer(cs.getContext(), consumerState.getInfo());
594            removeConsumerBrokerExchange(id);
595            return null;
596        }
597    
598        public Response processAddSession(SessionInfo info) throws Exception {
599            ConnectionId connectionId = info.getSessionId().getParentId();
600            TransportConnectionState cs = lookupConnectionState(connectionId);
601            // Avoid replaying dup commands
602            if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
603                broker.addSession(cs.getContext(), info);
604                try {
605                    cs.addSession(info);
606                } catch (IllegalStateException e) {
607                    e.printStackTrace();
608                    broker.removeSession(cs.getContext(), info);
609                }
610            }
611            return null;
612        }
613    
614        public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
615            ConnectionId connectionId = id.getParentId();
616            TransportConnectionState cs = lookupConnectionState(connectionId);
617            if (cs == null) {
618                throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
619            }
620            SessionState session = cs.getSessionState(id);
621            if (session == null) {
622                throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
623            }
624            // Don't let new consumers or producers get added while we are closing
625            // this down.
626            session.shutdown();
627            // Cascade the connection stop to the consumers and producers.
628            for (ConsumerId consumerId : session.getConsumerIds()) {
629                try {
630                    processRemoveConsumer(consumerId, lastDeliveredSequenceId);
631                } catch (Throwable e) {
632                    LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
633                }
634            }
635            for (ProducerId producerId : session.getProducerIds()) {
636                try {
637                    processRemoveProducer(producerId);
638                } catch (Throwable e) {
639                    LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
640                }
641            }
642            cs.removeSession(id);
643            broker.removeSession(cs.getContext(), session.getInfo());
644            return null;
645        }
646    
647        public Response processAddConnection(ConnectionInfo info) throws Exception {
648            // if the broker service has slave attached, wait for the slave to be
649            // attached to allow client connection. slave connection is fine
650            if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
651                    && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
652                ServiceSupport.dispose(transport);
653                return new ExceptionResponse(new Exception("Master's slave not attached yet."));
654            }
655            // Older clients should have been defaulting this field to true.. but
656            // they were not.
657            if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
658                info.setClientMaster(true);
659            }
660            TransportConnectionState state;
661            // Make sure 2 concurrent connections by the same ID only generate 1
662            // TransportConnectionState object.
663            synchronized (brokerConnectionStates) {
664                state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
665                if (state == null) {
666                    state = new TransportConnectionState(info, this);
667                    brokerConnectionStates.put(info.getConnectionId(), state);
668                }
669                state.incrementReference();
670            }
671            // If there are 2 concurrent connections for the same connection id,
672            // then last one in wins, we need to sync here
673            // to figure out the winner.
674            synchronized (state.getConnectionMutex()) {
675                if (state.getConnection() != this) {
676                    LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
677                    state.getConnection().stop();
678                    LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
679                            + state.getConnection().getRemoteAddress());
680                    state.setConnection(this);
681                    state.reset(info);
682                }
683            }
684            registerConnectionState(info.getConnectionId(), state);
685            LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress() + ", info: " + info);
686            this.faultTolerantConnection = info.isFaultTolerant();
687            // Setup the context.
688            String clientId = info.getClientId();
689            context = new ConnectionContext();
690            context.setBroker(broker);
691            context.setClientId(clientId);
692            context.setClientMaster(info.isClientMaster());
693            context.setConnection(this);
694            context.setConnectionId(info.getConnectionId());
695            context.setConnector(connector);
696            context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
697            context.setNetworkConnection(networkConnection);
698            context.setFaultTolerant(faultTolerantConnection);
699            context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
700            context.setUserName(info.getUserName());
701            context.setWireFormatInfo(wireFormatInfo);
702            context.setReconnect(info.isFailoverReconnect());
703            this.manageable = info.isManageable();
704            context.setConnectionState(state);
705            state.setContext(context);
706            state.setConnection(this);
707            if (info.getClientIp() == null) {
708                info.setClientIp(getRemoteAddress());
709            }
710    
711            try {
712                broker.addConnection(context, info);
713            } catch (Exception e) {
714                synchronized (brokerConnectionStates) {
715                    brokerConnectionStates.remove(info.getConnectionId());
716                }
717                unregisterConnectionState(info.getConnectionId());
718                LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString());
719                if (LOG.isDebugEnabled()) {
720                    LOG.debug("Exception detail:", e);
721                }
722                if (e instanceof SecurityException) {
723                    // close this down - in case the peer of this transport doesn't play nice
724                    delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
725                }
726                throw e;
727            }
728            if (info.isManageable()) {
729                // send ConnectionCommand
730                ConnectionControl command = this.connector.getConnectionControl();
731                command.setFaultTolerant(broker.isFaultTolerantConfiguration());
732                if (info.isFailoverReconnect()) {
733                    command.setRebalanceConnection(false);
734                }
735                dispatchAsync(command);
736            }
737            return null;
738        }
739    
740        public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
741                throws InterruptedException {
742            LOG.debug("remove connection id: " + id);
743            TransportConnectionState cs = lookupConnectionState(id);
744            if (cs != null) {
745                // Don't allow things to be added to the connection state while we
746                // are shutting down.
747                cs.shutdown();
748                // Cascade the connection stop to the sessions.
749                for (SessionId sessionId : cs.getSessionIds()) {
750                    try {
751                        processRemoveSession(sessionId, lastDeliveredSequenceId);
752                    } catch (Throwable e) {
753                        SERVICELOG.warn("Failed to remove session " + sessionId, e);
754                    }
755                }
756                // Cascade the connection stop to temp destinations.
757                for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
758                    DestinationInfo di = iter.next();
759                    try {
760                        broker.removeDestination(cs.getContext(), di.getDestination(), 0);
761                    } catch (Throwable e) {
762                        SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
763                    }
764                    iter.remove();
765                }
766                try {
767                    broker.removeConnection(cs.getContext(), cs.getInfo(), null);
768                } catch (Throwable e) {
769                    SERVICELOG.warn("Failed to remove connection " + cs.getInfo() + ", reason: " + e.toString());
770                    if (LOG.isDebugEnabled()) {
771                        SERVICELOG.debug("Exception detail:", e);
772                    }
773                }
774                TransportConnectionState state = unregisterConnectionState(id);
775                if (state != null) {
776                    synchronized (brokerConnectionStates) {
777                        // If we are the last reference, we should remove the state
778                        // from the broker.
779                        if (state.decrementReference() == 0) {
780                            brokerConnectionStates.remove(id);
781                        }
782                    }
783                }
784            }
785            return null;
786        }
787    
788        public Response processProducerAck(ProducerAck ack) throws Exception {
789            // A broker should not get ProducerAck messages.
790            return null;
791        }
792    
793        public Connector getConnector() {
794            return connector;
795        }
796    
797        public void dispatchSync(Command message) {
798            try {
799                processDispatch(message);
800            } catch (IOException e) {
801                serviceExceptionAsync(e);
802            }
803        }
804    
805        public void dispatchAsync(Command message) {
806            if (!stopping.get()) {
807                if (taskRunner == null) {
808                    dispatchSync(message);
809                } else {
810                    synchronized (dispatchQueue) {
811                        dispatchQueue.add(message);
812                    }
813                    try {
814                        taskRunner.wakeup();
815                    } catch (InterruptedException e) {
816                        Thread.currentThread().interrupt();
817                    }
818                }
819            } else {
820                if (message.isMessageDispatch()) {
821                    MessageDispatch md = (MessageDispatch) message;
822                    Runnable sub = md.getTransmitCallback();
823                    broker.postProcessDispatch(md);
824                    if (sub != null) {
825                        sub.run();
826                    }
827                }
828            }
829        }
830    
831        protected void processDispatch(Command command) throws IOException {
832            final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
833            try {
834                if (!stopping.get()) {
835                    if (messageDispatch != null) {
836                        broker.preProcessDispatch(messageDispatch);
837                    }
838                    dispatch(command);
839                }
840            } finally {
841                if (messageDispatch != null) {
842                    Runnable sub = messageDispatch.getTransmitCallback();
843                    broker.postProcessDispatch(messageDispatch);
844                    if (sub != null) {
845                        sub.run();
846                    }
847                }
848            }
849        }
850    
851        public boolean iterate() {
852            try {
853                if (pendingStop || stopping.get()) {
854                    if (dispatchStopped.compareAndSet(false, true)) {
855                        if (transportException.get() == null) {
856                            try {
857                                dispatch(new ShutdownInfo());
858                            } catch (Throwable ignore) {
859                            }
860                        }
861                        dispatchStoppedLatch.countDown();
862                    }
863                    return false;
864                }
865                if (!dispatchStopped.get()) {
866                    Command command = null;
867                    synchronized (dispatchQueue) {
868                        if (dispatchQueue.isEmpty()) {
869                            return false;
870                        }
871                        command = dispatchQueue.remove(0);
872                    }
873                    processDispatch(command);
874                    return true;
875                }
876                return false;
877            } catch (IOException e) {
878                if (dispatchStopped.compareAndSet(false, true)) {
879                    dispatchStoppedLatch.countDown();
880                }
881                serviceExceptionAsync(e);
882                return false;
883            }
884        }
885    
886        /**
887         * Returns the statistics for this connection
888         */
889        public ConnectionStatistics getStatistics() {
890            return statistics;
891        }
892    
893        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
894            return messageAuthorizationPolicy;
895        }
896    
897        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
898            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
899        }
900    
901        public boolean isManageable() {
902            return manageable;
903        }
904    
905        public void start() throws Exception {
906            try {
907                synchronized (this) {
908                    starting = true;
909                    if (taskRunnerFactory != null) {
910                        taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
911                                + getRemoteAddress());
912                    } else {
913                        taskRunner = null;
914                    }
915                    transport.start();
916                    active = true;
917                    BrokerInfo info = connector.getBrokerInfo().copy();
918                    if (connector.isUpdateClusterClients()) {
919                        info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
920                    } else {
921                        info.setPeerBrokerInfos(null);
922                    }
923                    dispatchAsync(info);
924    
925                    connector.onStarted(this);
926                }
927            } catch (Exception e) {
928                // Force clean up on an error starting up.
929                pendingStop = true;
930                throw e;
931            } finally {
932                // stop() can be called from within the above block,
933                // but we want to be sure start() completes before
934                // stop() runs, so queue the stop until right now:
935                setStarting(false);
936                if (isPendingStop()) {
937                    LOG.debug("Calling the delayed stop() after start() " + this);
938                    stop();
939                }
940            }
941        }
942    
943        public void stop() throws Exception {
944            // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
945            // as their lifecycle is handled elsewhere
946    
947            stopAsync();
948            while (!stopped.await(5, TimeUnit.SECONDS)) {
949                LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
950            }
951        }
952    
953        public void delayedStop(final int waitTime, final String reason, Throwable cause) {
954            if (waitTime > 0) {
955                synchronized (this) {
956                    pendingStop = true;
957                    stopError = cause;
958                }
959                try {
960                    stopTaskRunnerFactory.execute(new Runnable() {
961                        public void run() {
962                            try {
963                                Thread.sleep(waitTime);
964                                stopAsync();
965                                LOG.info("Stopping " + transport.getRemoteAddress() + " because " + reason);
966                            } catch (InterruptedException e) {
967                            }
968                        }
969                    });
970                } catch (Throwable t) {
971                    LOG.warn("Cannot create stopAsync. This exception will be ignored.", t);
972                }
973            }
974        }
975    
976        public void stopAsync() {
977            // If we're in the middle of starting then go no further... for now.
978            synchronized (this) {
979                pendingStop = true;
980                if (starting) {
981                    LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
982                    return;
983                }
984            }
985            if (stopping.compareAndSet(false, true)) {
986                // Let all the connection contexts know we are shutting down
987                // so that in progress operations can notice and unblock.
988                List<TransportConnectionState> connectionStates = listConnectionStates();
989                for (TransportConnectionState cs : connectionStates) {
990                    ConnectionContext connectionContext = cs.getContext();
991                    if (connectionContext != null) {
992                        connectionContext.getStopping().set(true);
993                    }
994                }
995                try {
996                    stopTaskRunnerFactory.execute(new Runnable() {
997                        public void run() {
998                            serviceLock.writeLock().lock();
999                            try {
1000                                doStop();
1001                            } catch (Throwable e) {
1002                                LOG.debug("Error occurred while shutting down a connection " + this, e);
1003                            } finally {
1004                                stopped.countDown();
1005                                serviceLock.writeLock().unlock();
1006                            }
1007                        }
1008                    });
1009                } catch (Throwable t) {
1010                    LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t);
1011                    stopped.countDown();
1012                }
1013            }
1014        }
1015    
1016        @Override
1017        public String toString() {
1018            return "Transport Connection to: " + transport.getRemoteAddress();
1019        }
1020    
1021        protected void doStop() throws Exception {
1022            LOG.debug("Stopping connection: {}", transport.getRemoteAddress());
1023            connector.onStopped(this);
1024            try {
1025                synchronized (this) {
1026                    if (masterBroker != null) {
1027                        masterBroker.stop();
1028                    }
1029                    if (duplexBridge != null) {
1030                        duplexBridge.stop();
1031                    }
1032                }
1033            } catch (Exception ignore) {
1034                LOG.trace("Exception caught stopping. This exception is ignored.", ignore);
1035            }
1036            try {
1037                transport.stop();
1038                LOG.debug("Stopped transport: " + transport.getRemoteAddress());
1039            } catch (Exception e) {
1040                LOG.debug("Could not stop transport to " + transport.getRemoteAddress() + ". This exception is ignored.", e);
1041            }
1042            if (taskRunner != null) {
1043                taskRunner.shutdown(1);
1044                taskRunner = null;
1045            }
1046            active = false;
1047            // Run the MessageDispatch callbacks so that message references get
1048            // cleaned up.
1049            synchronized (dispatchQueue) {
1050                for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
1051                    Command command = iter.next();
1052                    if (command.isMessageDispatch()) {
1053                        MessageDispatch md = (MessageDispatch) command;
1054                        Runnable sub = md.getTransmitCallback();
1055                        broker.postProcessDispatch(md);
1056                        if (sub != null) {
1057                            sub.run();
1058                        }
1059                    }
1060                }
1061                dispatchQueue.clear();
1062            }
1063            //
1064            // Remove all logical connection associated with this connection
1065            // from the broker.
1066            if (!broker.isStopped()) {
1067                List<TransportConnectionState> connectionStates = listConnectionStates();
1068                connectionStates = listConnectionStates();
1069                for (TransportConnectionState cs : connectionStates) {
1070                    cs.getContext().getStopping().set(true);
1071                    try {
1072                        LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
1073                        processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
1074                    } catch (Throwable ignore) {
1075                        ignore.printStackTrace();
1076                    }
1077                }
1078            }
1079            LOG.debug("Connection Stopped: {}", getRemoteAddress());
1080        }
1081    
1082        /**
1083         * @return Returns the blockedCandidate.
1084         */
1085        public boolean isBlockedCandidate() {
1086            return blockedCandidate;
1087        }
1088    
1089        /**
1090         * @param blockedCandidate The blockedCandidate to set.
1091         */
1092        public void setBlockedCandidate(boolean blockedCandidate) {
1093            this.blockedCandidate = blockedCandidate;
1094        }
1095    
1096        /**
1097         * @return Returns the markedCandidate.
1098         */
1099        public boolean isMarkedCandidate() {
1100            return markedCandidate;
1101        }
1102    
1103        /**
1104         * @param markedCandidate The markedCandidate to set.
1105         */
1106        public void setMarkedCandidate(boolean markedCandidate) {
1107            this.markedCandidate = markedCandidate;
1108            if (!markedCandidate) {
1109                timeStamp = 0;
1110                blockedCandidate = false;
1111            }
1112        }
1113    
1114        /**
1115         * @param slow The slow to set.
1116         */
1117        public void setSlow(boolean slow) {
1118            this.slow = slow;
1119        }
1120    
1121        /**
1122         * @return true if the Connection is slow
1123         */
1124        public boolean isSlow() {
1125            return slow;
1126        }
1127    
1128        /**
1129         * @return true if the Connection is potentially blocked
1130         */
1131        public boolean isMarkedBlockedCandidate() {
1132            return markedCandidate;
1133        }
1134    
1135        /**
1136         * Mark the Connection, so we can deem if it's collectable on the next sweep
1137         */
1138        public void doMark() {
1139            if (timeStamp == 0) {
1140                timeStamp = System.currentTimeMillis();
1141            }
1142        }
1143    
1144        /**
1145         * @return if after being marked, the Connection is still writing
1146         */
1147        public boolean isBlocked() {
1148            return blocked;
1149        }
1150    
1151        /**
1152         * @return true if the Connection is connected
1153         */
1154        public boolean isConnected() {
1155            return connected;
1156        }
1157    
1158        /**
1159         * @param blocked The blocked to set.
1160         */
1161        public void setBlocked(boolean blocked) {
1162            this.blocked = blocked;
1163        }
1164    
1165        /**
1166         * @param connected The connected to set.
1167         */
1168        public void setConnected(boolean connected) {
1169            this.connected = connected;
1170        }
1171    
1172        /**
1173         * @return true if the Connection is active
1174         */
1175        public boolean isActive() {
1176            return active;
1177        }
1178    
1179        /**
1180         * @param active The active to set.
1181         */
1182        public void setActive(boolean active) {
1183            this.active = active;
1184        }
1185    
1186        /**
1187         * @return true if the Connection is starting
1188         */
1189        public synchronized boolean isStarting() {
1190            return starting;
1191        }
1192    
1193        public synchronized boolean isNetworkConnection() {
1194            return networkConnection;
1195        }
1196    
1197        public boolean isFaultTolerantConnection() {
1198            return this.faultTolerantConnection;
1199        }
1200    
1201        protected synchronized void setStarting(boolean starting) {
1202            this.starting = starting;
1203        }
1204    
1205        /**
1206         * @return true if the Connection needs to stop
1207         */
1208        public synchronized boolean isPendingStop() {
1209            return pendingStop;
1210        }
1211    
1212        protected synchronized void setPendingStop(boolean pendingStop) {
1213            this.pendingStop = pendingStop;
1214        }
1215    
1216        public Response processBrokerInfo(BrokerInfo info) {
1217            if (info.isSlaveBroker()) {
1218                BrokerService bService = connector.getBrokerService();
1219                // Do we only support passive slaves - or does the slave want to be
1220                // passive ?
1221                boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
1222                if (passive == false) {
1223    
1224                    // stream messages from this broker (the master) to
1225                    // the slave
1226                    MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
1227                    masterBroker = new MasterBroker(parent, transport);
1228                    masterBroker.startProcessing();
1229                }
1230                LOG.info((passive ? "Passive" : "Active") + " Slave Broker " + info.getBrokerName() + " is attached");
1231                bService.slaveConnectionEstablished();
1232            } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1233                // so this TransportConnection is the rear end of a network bridge
1234                // We have been requested to create a two way pipe ...
1235                try {
1236                    Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1237                    Map<String, String> props = createMap(properties);
1238                    NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1239                    IntrospectionSupport.setProperties(config, props, "");
1240                    config.setBrokerName(broker.getBrokerName());
1241    
1242                    // check for existing duplex connection hanging about
1243    
1244                    // We first look if existing network connection already exists for the same broker Id and network connector name
1245                    // It's possible in case of brief network fault to have this transport connector side of the connection always active
1246                    // and the duplex network connector side wanting to open a new one
1247                    // In this case, the old connection must be broken
1248                    String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1249                    CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1250                    synchronized (connections) {
1251                        for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
1252                            TransportConnection c = iter.next();
1253                            if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1254                                LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
1255                                c.stopAsync();
1256                                // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1257                                c.getStopped().await(1, TimeUnit.SECONDS);
1258                            }
1259                        }
1260                        setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1261                    }
1262                    URI uri = broker.getVmConnectorURI();
1263                    HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
1264                    map.put("network", "true");
1265                    map.put("async", "false");
1266                    uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
1267                    Transport localTransport = TransportFactory.connect(uri);
1268                    Transport remoteBridgeTransport = new ResponseCorrelator(transport);
1269                    String duplexName = localTransport.toString();
1270                    if (duplexName.contains("#")) {
1271                        duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1272                    }
1273                    MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
1274                    listener.setCreatedByDuplex(true);
1275                    duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1276                    duplexBridge.setBrokerService(broker.getBrokerService());
1277                    // now turn duplex off this side
1278                    info.setDuplexConnection(false);
1279                    duplexBridge.setCreatedByDuplex(true);
1280                    duplexBridge.duplexStart(this, brokerInfo, info);
1281                    LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId);
1282                    return null;
1283                } catch (TransportDisposedIOException e) {
1284                    LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
1285                    return null;
1286                } catch (Exception e) {
1287                    LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId, e);
1288                    return null;
1289                }
1290            }
1291            // We only expect to get one broker info command per connection
1292            if (this.brokerInfo != null) {
1293                LOG.warn("Unexpected extra broker info command received: " + info);
1294            }
1295            this.brokerInfo = info;
1296            networkConnection = true;
1297            List<TransportConnectionState> connectionStates = listConnectionStates();
1298            for (TransportConnectionState cs : connectionStates) {
1299                cs.getContext().setNetworkConnection(true);
1300            }
1301            return null;
1302        }
1303    
1304        @SuppressWarnings({"unchecked", "rawtypes"})
1305        private HashMap<String, String> createMap(Properties properties) {
1306            return new HashMap(properties);
1307        }
1308    
1309        protected void dispatch(Command command) throws IOException {
1310            try {
1311                setMarkedCandidate(true);
1312                transport.oneway(command);
1313            } finally {
1314                setMarkedCandidate(false);
1315            }
1316        }
1317    
1318        public String getRemoteAddress() {
1319            return transport.getRemoteAddress();
1320        }
1321    
1322        public String getConnectionId() {
1323            List<TransportConnectionState> connectionStates = listConnectionStates();
1324            for (TransportConnectionState cs : connectionStates) {
1325                if (cs.getInfo().getClientId() != null) {
1326                    return cs.getInfo().getClientId();
1327                }
1328                return cs.getInfo().getConnectionId().toString();
1329            }
1330            return null;
1331        }
1332    
1333        public void updateClient(ConnectionControl control) {
1334            if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1335                    && this.wireFormatInfo.getVersion() >= 6) {
1336                dispatchAsync(control);
1337            }
1338        }
1339    
1340        private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1341            ProducerBrokerExchange result = producerExchanges.get(id);
1342            if (result == null) {
1343                synchronized (producerExchanges) {
1344                    result = new ProducerBrokerExchange();
1345                    TransportConnectionState state = lookupConnectionState(id);
1346                    context = state.getContext();
1347                    result.setConnectionContext(context);
1348                    if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
1349                        result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
1350                    }
1351                    SessionState ss = state.getSessionState(id.getParentId());
1352                    if (ss != null) {
1353                        result.setProducerState(ss.getProducerState(id));
1354                        ProducerState producerState = ss.getProducerState(id);
1355                        if (producerState != null && producerState.getInfo() != null) {
1356                            ProducerInfo info = producerState.getInfo();
1357                            result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1358                        }
1359                    }
1360                    producerExchanges.put(id, result);
1361                }
1362            } else {
1363                context = result.getConnectionContext();
1364            }
1365            return result;
1366        }
1367    
1368        private void removeProducerBrokerExchange(ProducerId id) {
1369            synchronized (producerExchanges) {
1370                producerExchanges.remove(id);
1371            }
1372        }
1373    
1374        private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1375            ConsumerBrokerExchange result = consumerExchanges.get(id);
1376            return result;
1377        }
1378    
1379        private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) {
1380            ConsumerBrokerExchange result = consumerExchanges.get(id);
1381            if (result == null) {
1382                synchronized (consumerExchanges) {
1383                    result = new ConsumerBrokerExchange();
1384                    TransportConnectionState state = lookupConnectionState(id);
1385                    context = state.getContext();
1386                    result.setConnectionContext(context);
1387                    SessionState ss = state.getSessionState(id.getParentId());
1388                    if (ss != null) {
1389                        ConsumerState cs = ss.getConsumerState(id);
1390                        if (cs != null) {
1391                            ConsumerInfo info = cs.getInfo();
1392                            if (info != null) {
1393                                if (info.getDestination() != null && info.getDestination().isPattern()) {
1394                                    result.setWildcard(true);
1395                                }
1396                            }
1397                        }
1398                    }
1399                    consumerExchanges.put(id, result);
1400                }
1401            }
1402            return result;
1403        }
1404    
1405        private void removeConsumerBrokerExchange(ConsumerId id) {
1406            synchronized (consumerExchanges) {
1407                consumerExchanges.remove(id);
1408            }
1409        }
1410    
1411        public int getProtocolVersion() {
1412            return protocolVersion.get();
1413        }
1414    
1415        public Response processControlCommand(ControlCommand command) throws Exception {
1416            String control = command.getCommand();
1417            if (control != null && control.equals("shutdown")) {
1418                System.exit(0);
1419            }
1420            return null;
1421        }
1422    
1423        public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1424            return null;
1425        }
1426    
1427        public Response processConnectionControl(ConnectionControl control) throws Exception {
1428            if (control != null) {
1429                faultTolerantConnection = control.isFaultTolerant();
1430            }
1431            return null;
1432        }
1433    
1434        public Response processConnectionError(ConnectionError error) throws Exception {
1435            return null;
1436        }
1437    
1438        public Response processConsumerControl(ConsumerControl control) throws Exception {
1439            ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1440            broker.processConsumerControl(consumerExchange, control);
1441            return null;
1442        }
1443    
1444        protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1445                                                                                TransportConnectionState state) {
1446            TransportConnectionState cs = null;
1447            if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1448                // swap implementations
1449                TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1450                newRegister.intialize(connectionStateRegister);
1451                connectionStateRegister = newRegister;
1452            }
1453            cs = connectionStateRegister.registerConnectionState(connectionId, state);
1454            return cs;
1455        }
1456    
1457        protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1458            return connectionStateRegister.unregisterConnectionState(connectionId);
1459        }
1460    
1461        protected synchronized List<TransportConnectionState> listConnectionStates() {
1462            return connectionStateRegister.listConnectionStates();
1463        }
1464    
1465        protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1466            return connectionStateRegister.lookupConnectionState(connectionId);
1467        }
1468    
1469        protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1470            return connectionStateRegister.lookupConnectionState(id);
1471        }
1472    
1473        protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1474            return connectionStateRegister.lookupConnectionState(id);
1475        }
1476    
1477        protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1478            return connectionStateRegister.lookupConnectionState(id);
1479        }
1480    
1481        protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1482            return connectionStateRegister.lookupConnectionState(connectionId);
1483        }
1484    
1485        protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1486            this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1487        }
1488    
1489        protected synchronized String getDuplexNetworkConnectorId() {
1490            return this.duplexNetworkConnectorId;
1491        }
1492    
1493        public boolean isStopping() {
1494            return stopping.get();
1495        }
1496    
1497        protected CountDownLatch getStopped() {
1498            return stopped;
1499        }
1500    
1501        private int getProducerCount(ConnectionId connectionId) {
1502            int result = 0;
1503            TransportConnectionState cs = lookupConnectionState(connectionId);
1504            if (cs != null) {
1505                for (SessionId sessionId : cs.getSessionIds()) {
1506                    SessionState sessionState = cs.getSessionState(sessionId);
1507                    if (sessionState != null) {
1508                        result += sessionState.getProducerIds().size();
1509                    }
1510                }
1511            }
1512            return result;
1513        }
1514    
1515        private int getConsumerCount(ConnectionId connectionId) {
1516            int result = 0;
1517            TransportConnectionState cs = lookupConnectionState(connectionId);
1518            if (cs != null) {
1519                for (SessionId sessionId : cs.getSessionIds()) {
1520                    SessionState sessionState = cs.getSessionState(sessionId);
1521                    if (sessionState != null) {
1522                        result += sessionState.getConsumerIds().size();
1523                    }
1524                }
1525            }
1526            return result;
1527        }
1528    }