001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.EOFException;
020    import java.io.IOException;
021    import java.net.SocketException;
022    import java.net.URI;
023    import java.util.HashMap;
024    import java.util.Iterator;
025    import java.util.LinkedList;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Properties;
029    import java.util.concurrent.ConcurrentHashMap;
030    import java.util.concurrent.CopyOnWriteArrayList;
031    import java.util.concurrent.CountDownLatch;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicBoolean;
034    import java.util.concurrent.atomic.AtomicInteger;
035    import java.util.concurrent.atomic.AtomicReference;
036    import java.util.concurrent.locks.ReentrantReadWriteLock;
037    
038    import javax.transaction.xa.XAResource;
039    
040    import org.apache.activemq.advisory.AdvisorySupport;
041    import org.apache.activemq.broker.region.ConnectionStatistics;
042    import org.apache.activemq.broker.region.RegionBroker;
043    import org.apache.activemq.command.ActiveMQDestination;
044    import org.apache.activemq.command.BrokerInfo;
045    import org.apache.activemq.command.Command;
046    import org.apache.activemq.command.CommandTypes;
047    import org.apache.activemq.command.ConnectionControl;
048    import org.apache.activemq.command.ConnectionError;
049    import org.apache.activemq.command.ConnectionId;
050    import org.apache.activemq.command.ConnectionInfo;
051    import org.apache.activemq.command.ConsumerControl;
052    import org.apache.activemq.command.ConsumerId;
053    import org.apache.activemq.command.ConsumerInfo;
054    import org.apache.activemq.command.ControlCommand;
055    import org.apache.activemq.command.DataArrayResponse;
056    import org.apache.activemq.command.DestinationInfo;
057    import org.apache.activemq.command.ExceptionResponse;
058    import org.apache.activemq.command.FlushCommand;
059    import org.apache.activemq.command.IntegerResponse;
060    import org.apache.activemq.command.KeepAliveInfo;
061    import org.apache.activemq.command.Message;
062    import org.apache.activemq.command.MessageAck;
063    import org.apache.activemq.command.MessageDispatch;
064    import org.apache.activemq.command.MessageDispatchNotification;
065    import org.apache.activemq.command.MessagePull;
066    import org.apache.activemq.command.ProducerAck;
067    import org.apache.activemq.command.ProducerId;
068    import org.apache.activemq.command.ProducerInfo;
069    import org.apache.activemq.command.RemoveSubscriptionInfo;
070    import org.apache.activemq.command.Response;
071    import org.apache.activemq.command.SessionId;
072    import org.apache.activemq.command.SessionInfo;
073    import org.apache.activemq.command.ShutdownInfo;
074    import org.apache.activemq.command.TransactionId;
075    import org.apache.activemq.command.TransactionInfo;
076    import org.apache.activemq.command.WireFormatInfo;
077    import org.apache.activemq.network.DemandForwardingBridge;
078    import org.apache.activemq.network.MBeanNetworkListener;
079    import org.apache.activemq.network.NetworkBridgeConfiguration;
080    import org.apache.activemq.network.NetworkBridgeFactory;
081    import org.apache.activemq.security.MessageAuthorizationPolicy;
082    import org.apache.activemq.state.CommandVisitor;
083    import org.apache.activemq.state.ConnectionState;
084    import org.apache.activemq.state.ConsumerState;
085    import org.apache.activemq.state.ProducerState;
086    import org.apache.activemq.state.SessionState;
087    import org.apache.activemq.state.TransactionState;
088    import org.apache.activemq.thread.Task;
089    import org.apache.activemq.thread.TaskRunner;
090    import org.apache.activemq.thread.TaskRunnerFactory;
091    import org.apache.activemq.transaction.Transaction;
092    import org.apache.activemq.transport.DefaultTransportListener;
093    import org.apache.activemq.transport.ResponseCorrelator;
094    import org.apache.activemq.transport.TransmitCallback;
095    import org.apache.activemq.transport.Transport;
096    import org.apache.activemq.transport.TransportDisposedIOException;
097    import org.apache.activemq.transport.TransportFactory;
098    import org.apache.activemq.util.IntrospectionSupport;
099    import org.apache.activemq.util.MarshallingSupport;
100    import org.apache.activemq.util.URISupport;
101    import org.slf4j.Logger;
102    import org.slf4j.LoggerFactory;
103    import org.slf4j.MDC;
104    
105    public class TransportConnection implements Connection, Task, CommandVisitor {
106        private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
107        private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
108        private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
109        // Keeps track of the broker and connector that created this connection.
110        protected final Broker broker;
111        protected final TransportConnector connector;
112        // Keeps track of the state of the connections.
113        // protected final ConcurrentHashMap localConnectionStates=new
114        // ConcurrentHashMap();
115        protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
116        // The broker and wireformat info that was exchanged.
117        protected BrokerInfo brokerInfo;
118        protected final List<Command> dispatchQueue = new LinkedList<Command>();
119        protected TaskRunner taskRunner;
120        protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
121        protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
122        private final Transport transport;
123        private MessageAuthorizationPolicy messageAuthorizationPolicy;
124        private WireFormatInfo wireFormatInfo;
125        // Used to do async dispatch.. this should perhaps be pushed down into the
126        // transport layer..
127        private boolean inServiceException;
128        private final ConnectionStatistics statistics = new ConnectionStatistics();
129        private boolean manageable;
130        private boolean slow;
131        private boolean markedCandidate;
132        private boolean blockedCandidate;
133        private boolean blocked;
134        private boolean connected;
135        private boolean active;
136        private boolean starting;
137        private boolean pendingStop;
138        private long timeStamp;
139        private final AtomicBoolean stopping = new AtomicBoolean(false);
140        private final CountDownLatch stopped = new CountDownLatch(1);
141        private final AtomicBoolean asyncException = new AtomicBoolean(false);
142        private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
143        private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
144        private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
145        private ConnectionContext context;
146        private boolean networkConnection;
147        private boolean faultTolerantConnection;
148        private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
149        private DemandForwardingBridge duplexBridge;
150        private final TaskRunnerFactory taskRunnerFactory;
151        private final TaskRunnerFactory stopTaskRunnerFactory;
152        private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
153        private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
154        private String duplexNetworkConnectorId;
155        private Throwable stopError = null;
156    
157        /**
158         * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
159         *                          else commands are sent async.
160         * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection.
161         */
162        public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
163                                   TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
164            this.connector = connector;
165            this.broker = broker;
166            RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
167            brokerConnectionStates = rb.getConnectionStates();
168            if (connector != null) {
169                this.statistics.setParent(connector.getStatistics());
170                this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
171            }
172            this.taskRunnerFactory = taskRunnerFactory;
173            this.stopTaskRunnerFactory = stopTaskRunnerFactory;
174            this.transport = transport;
175            this.transport.setTransportListener(new DefaultTransportListener() {
176                @Override
177                public void onCommand(Object o) {
178                    serviceLock.readLock().lock();
179                    try {
180                        if (!(o instanceof Command)) {
181                            throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
182                        }
183                        Command command = (Command) o;
184                        Response response = service(command);
185                        if (response != null) {
186                            dispatchSync(response);
187                        }
188                    } finally {
189                        serviceLock.readLock().unlock();
190                    }
191                }
192    
193                @Override
194                public void onException(IOException exception) {
195                    serviceLock.readLock().lock();
196                    try {
197                        serviceTransportException(exception);
198                    } finally {
199                        serviceLock.readLock().unlock();
200                    }
201                }
202            });
203            connected = true;
204        }
205    
206        /**
207         * Returns the number of messages to be dispatched to this connection
208         *
209         * @return size of dispatch queue
210         */
211        @Override
212        public int getDispatchQueueSize() {
213            synchronized (dispatchQueue) {
214                return dispatchQueue.size();
215            }
216        }
217    
218        public void serviceTransportException(IOException e) {
219            BrokerService bService = connector.getBrokerService();
220            if (bService.isShutdownOnSlaveFailure()) {
221                if (brokerInfo != null) {
222                    if (brokerInfo.isSlaveBroker()) {
223                        LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e);
224                        try {
225                            doStop();
226                            bService.stop();
227                        } catch (Exception ex) {
228                            LOG.warn("Failed to stop the master", ex);
229                        }
230                    }
231                }
232            }
233            if (!stopping.get()) {
234                transportException.set(e);
235                if (TRANSPORTLOG.isDebugEnabled()) {
236                    TRANSPORTLOG.debug(this + " failed: " + e, e);
237                } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
238                    TRANSPORTLOG.warn(this + " failed: " + e);
239                }
240                stopAsync();
241            }
242        }
243    
244        private boolean expected(IOException e) {
245            return isStomp() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
246        }
247    
248        private boolean isStomp() {
249            URI uri = connector.getUri();
250            return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
251        }
252    
253        /**
254         * Calls the serviceException method in an async thread. Since handling a
255         * service exception closes a socket, we should not tie up broker threads
256         * since client sockets may hang or cause deadlocks.
257         */
258        @Override
259        public void serviceExceptionAsync(final IOException e) {
260            if (asyncException.compareAndSet(false, true)) {
261                new Thread("Async Exception Handler") {
262                    @Override
263                    public void run() {
264                        serviceException(e);
265                    }
266                }.start();
267            }
268        }
269    
270        /**
271         * Closes a clients connection due to a detected error. Errors are ignored
272         * if: the client is closing or broker is closing. Otherwise, the connection
273         * error transmitted to the client before stopping it's transport.
274         */
275        @Override
276        public void serviceException(Throwable e) {
277            // are we a transport exception such as not being able to dispatch
278            // synchronously to a transport
279            if (e instanceof IOException) {
280                serviceTransportException((IOException) e);
281            } else if (e.getClass() == BrokerStoppedException.class) {
282                // Handle the case where the broker is stopped
283                // But the client is still connected.
284                if (!stopping.get()) {
285                    if (SERVICELOG.isDebugEnabled()) {
286                        SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
287                    }
288                    ConnectionError ce = new ConnectionError();
289                    ce.setException(e);
290                    dispatchSync(ce);
291                    // Record the error that caused the transport to stop
292                    this.stopError = e;
293                    // Wait a little bit to try to get the output buffer to flush
294                    // the exception notification to the client.
295                    try {
296                        Thread.sleep(500);
297                    } catch (InterruptedException ie) {
298                        Thread.currentThread().interrupt();
299                    }
300                    // Worst case is we just kill the connection before the
301                    // notification gets to him.
302                    stopAsync();
303                }
304            } else if (!stopping.get() && !inServiceException) {
305                inServiceException = true;
306                try {
307                    SERVICELOG.warn("Async error occurred: " + e, e);
308                    ConnectionError ce = new ConnectionError();
309                    ce.setException(e);
310                    if (pendingStop) {
311                        dispatchSync(ce);
312                    } else {
313                        dispatchAsync(ce);
314                    }
315                } finally {
316                    inServiceException = false;
317                }
318            }
319        }
320    
321        @Override
322        public Response service(Command command) {
323            MDC.put("activemq.connector", connector.getUri().toString());
324            Response response = null;
325            boolean responseRequired = command.isResponseRequired();
326            int commandId = command.getCommandId();
327            try {
328                if (!pendingStop) {
329                    response = command.visit(this);
330                } else {
331                    response = new ExceptionResponse(this.stopError);
332                }
333            } catch (Throwable e) {
334                if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
335                    SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
336                            + " command: " + command + ", exception: " + e, e);
337                }
338    
339                if (responseRequired) {
340                    response = new ExceptionResponse(e);
341                } else {
342                    serviceException(e);
343                }
344            }
345            if (responseRequired) {
346                if (response == null) {
347                    response = new Response();
348                }
349                response.setCorrelationId(commandId);
350            }
351            // The context may have been flagged so that the response is not
352            // sent.
353            if (context != null) {
354                if (context.isDontSendReponse()) {
355                    context.setDontSendReponse(false);
356                    response = null;
357                }
358                context = null;
359            }
360            MDC.remove("activemq.connector");
361            return response;
362        }
363    
364        @Override
365        public Response processKeepAlive(KeepAliveInfo info) throws Exception {
366            return null;
367        }
368    
369        @Override
370        public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
371            broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
372            return null;
373        }
374    
375        @Override
376        public Response processWireFormat(WireFormatInfo info) throws Exception {
377            wireFormatInfo = info;
378            protocolVersion.set(info.getVersion());
379            return null;
380        }
381    
382        @Override
383        public Response processShutdown(ShutdownInfo info) throws Exception {
384            stopAsync();
385            return null;
386        }
387    
388        @Override
389        public Response processFlush(FlushCommand command) throws Exception {
390            return null;
391        }
392    
393        @Override
394        public Response processBeginTransaction(TransactionInfo info) throws Exception {
395            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
396            context = null;
397            if (cs != null) {
398                context = cs.getContext();
399            }
400            if (cs == null) {
401                throw new NullPointerException("Context is null");
402            }
403            // Avoid replaying dup commands
404            if (cs.getTransactionState(info.getTransactionId()) == null) {
405                cs.addTransactionState(info.getTransactionId());
406                broker.beginTransaction(context, info.getTransactionId());
407            }
408            return null;
409        }
410    
411        @Override
412        public Response processEndTransaction(TransactionInfo info) throws Exception {
413            // No need to do anything. This packet is just sent by the client
414            // make sure he is synced with the server as commit command could
415            // come from a different connection.
416            return null;
417        }
418    
419        @Override
420        public Response processPrepareTransaction(TransactionInfo info) throws Exception {
421            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
422            context = null;
423            if (cs != null) {
424                context = cs.getContext();
425            }
426            if (cs == null) {
427                throw new NullPointerException("Context is null");
428            }
429            TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
430            if (transactionState == null) {
431                throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
432                        + info.getTransactionId());
433            }
434            // Avoid dups.
435            if (!transactionState.isPrepared()) {
436                transactionState.setPrepared(true);
437                int result = broker.prepareTransaction(context, info.getTransactionId());
438                transactionState.setPreparedResult(result);
439                if (result == XAResource.XA_RDONLY) {
440                    // we are done, no further rollback or commit from TM
441                    cs.removeTransactionState(info.getTransactionId());
442                }
443                IntegerResponse response = new IntegerResponse(result);
444                return response;
445            } else {
446                IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
447                return response;
448            }
449        }
450    
451        @Override
452        public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
453            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
454            context = cs.getContext();
455            cs.removeTransactionState(info.getTransactionId());
456            broker.commitTransaction(context, info.getTransactionId(), true);
457            return null;
458        }
459    
460        @Override
461        public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
462            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
463            context = cs.getContext();
464            cs.removeTransactionState(info.getTransactionId());
465            broker.commitTransaction(context, info.getTransactionId(), false);
466            return null;
467        }
468    
469        @Override
470        public Response processRollbackTransaction(TransactionInfo info) throws Exception {
471            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
472            context = cs.getContext();
473            cs.removeTransactionState(info.getTransactionId());
474            broker.rollbackTransaction(context, info.getTransactionId());
475            return null;
476        }
477    
478        @Override
479        public Response processForgetTransaction(TransactionInfo info) throws Exception {
480            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
481            context = cs.getContext();
482            broker.forgetTransaction(context, info.getTransactionId());
483            return null;
484        }
485    
486        @Override
487        public Response processRecoverTransactions(TransactionInfo info) throws Exception {
488            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
489            context = cs.getContext();
490            TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
491            return new DataArrayResponse(preparedTransactions);
492        }
493    
494        @Override
495        public Response processMessage(Message messageSend) throws Exception {
496            ProducerId producerId = messageSend.getProducerId();
497            ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
498            if (producerExchange.canDispatch(messageSend)) {
499                broker.send(producerExchange, messageSend);
500            }
501            return null;
502        }
503    
504        @Override
505        public Response processMessageAck(MessageAck ack) throws Exception {
506            ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
507            if (consumerExchange != null) {
508                broker.acknowledge(consumerExchange, ack);
509            }
510            return null;
511        }
512    
513        @Override
514        public Response processMessagePull(MessagePull pull) throws Exception {
515            return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
516        }
517    
518        @Override
519        public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
520            broker.processDispatchNotification(notification);
521            return null;
522        }
523    
524        @Override
525        public Response processAddDestination(DestinationInfo info) throws Exception {
526            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
527            broker.addDestinationInfo(cs.getContext(), info);
528            if (info.getDestination().isTemporary()) {
529                cs.addTempDestination(info);
530            }
531            return null;
532        }
533    
534        @Override
535        public Response processRemoveDestination(DestinationInfo info) throws Exception {
536            TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
537            broker.removeDestinationInfo(cs.getContext(), info);
538            if (info.getDestination().isTemporary()) {
539                cs.removeTempDestination(info.getDestination());
540            }
541            return null;
542        }
543    
544        @Override
545        public Response processAddProducer(ProducerInfo info) throws Exception {
546            SessionId sessionId = info.getProducerId().getParentId();
547            ConnectionId connectionId = sessionId.getParentId();
548            TransportConnectionState cs = lookupConnectionState(connectionId);
549            if (cs == null) {
550                throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
551                        + connectionId);
552            }
553            SessionState ss = cs.getSessionState(sessionId);
554            if (ss == null) {
555                throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
556                        + sessionId);
557            }
558            // Avoid replaying dup commands
559            if (!ss.getProducerIds().contains(info.getProducerId())) {
560                ActiveMQDestination destination = info.getDestination();
561                if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
562                    if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
563                        throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
564                    }
565                }
566                broker.addProducer(cs.getContext(), info);
567                try {
568                    ss.addProducer(info);
569                } catch (IllegalStateException e) {
570                    broker.removeProducer(cs.getContext(), info);
571                }
572    
573            }
574            return null;
575        }
576    
577        @Override
578        public Response processRemoveProducer(ProducerId id) throws Exception {
579            SessionId sessionId = id.getParentId();
580            ConnectionId connectionId = sessionId.getParentId();
581            TransportConnectionState cs = lookupConnectionState(connectionId);
582            SessionState ss = cs.getSessionState(sessionId);
583            if (ss == null) {
584                throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
585                        + sessionId);
586            }
587            ProducerState ps = ss.removeProducer(id);
588            if (ps == null) {
589                throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
590            }
591            removeProducerBrokerExchange(id);
592            broker.removeProducer(cs.getContext(), ps.getInfo());
593            return null;
594        }
595    
596        @Override
597        public Response processAddConsumer(ConsumerInfo info) throws Exception {
598            SessionId sessionId = info.getConsumerId().getParentId();
599            ConnectionId connectionId = sessionId.getParentId();
600            TransportConnectionState cs = lookupConnectionState(connectionId);
601            if (cs == null) {
602                throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: "
603                        + connectionId);
604            }
605            SessionState ss = cs.getSessionState(sessionId);
606            if (ss == null) {
607                throw new IllegalStateException(broker.getBrokerName()
608                        + " Cannot add a consumer to a session that had not been registered: " + sessionId);
609            }
610            // Avoid replaying dup commands
611            if (!ss.getConsumerIds().contains(info.getConsumerId())) {
612                ActiveMQDestination destination = info.getDestination();
613                if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
614                    if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
615                        throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
616                    }
617                }
618    
619                broker.addConsumer(cs.getContext(), info);
620                try {
621                    ss.addConsumer(info);
622                    addConsumerBrokerExchange(info.getConsumerId());
623                } catch (IllegalStateException e) {
624                    broker.removeConsumer(cs.getContext(), info);
625                }
626    
627            }
628            return null;
629        }
630    
631        @Override
632        public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
633            SessionId sessionId = id.getParentId();
634            ConnectionId connectionId = sessionId.getParentId();
635            TransportConnectionState cs = lookupConnectionState(connectionId);
636            if (cs == null) {
637                throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
638                        + connectionId);
639            }
640            SessionState ss = cs.getSessionState(sessionId);
641            if (ss == null) {
642                throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
643                        + sessionId);
644            }
645            ConsumerState consumerState = ss.removeConsumer(id);
646            if (consumerState == null) {
647                throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
648            }
649            ConsumerInfo info = consumerState.getInfo();
650            info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
651            broker.removeConsumer(cs.getContext(), consumerState.getInfo());
652            removeConsumerBrokerExchange(id);
653            return null;
654        }
655    
656        @Override
657        public Response processAddSession(SessionInfo info) throws Exception {
658            ConnectionId connectionId = info.getSessionId().getParentId();
659            TransportConnectionState cs = lookupConnectionState(connectionId);
660            // Avoid replaying dup commands
661            if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
662                broker.addSession(cs.getContext(), info);
663                try {
664                    cs.addSession(info);
665                } catch (IllegalStateException e) {
666                    e.printStackTrace();
667                    broker.removeSession(cs.getContext(), info);
668                }
669            }
670            return null;
671        }
672    
673        @Override
674        public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
675            ConnectionId connectionId = id.getParentId();
676            TransportConnectionState cs = lookupConnectionState(connectionId);
677            if (cs == null) {
678                throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
679            }
680            SessionState session = cs.getSessionState(id);
681            if (session == null) {
682                throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
683            }
684            // Don't let new consumers or producers get added while we are closing
685            // this down.
686            session.shutdown();
687            // Cascade the connection stop to the consumers and producers.
688            for (ConsumerId consumerId : session.getConsumerIds()) {
689                try {
690                    processRemoveConsumer(consumerId, lastDeliveredSequenceId);
691                } catch (Throwable e) {
692                    LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
693                }
694            }
695            for (ProducerId producerId : session.getProducerIds()) {
696                try {
697                    processRemoveProducer(producerId);
698                } catch (Throwable e) {
699                    LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
700                }
701            }
702            cs.removeSession(id);
703            broker.removeSession(cs.getContext(), session.getInfo());
704            return null;
705        }
706    
707        @Override
708        public Response processAddConnection(ConnectionInfo info) throws Exception {
709            // Older clients should have been defaulting this field to true.. but
710            // they were not.
711            if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
712                info.setClientMaster(true);
713            }
714            TransportConnectionState state;
715            // Make sure 2 concurrent connections by the same ID only generate 1
716            // TransportConnectionState object.
717            synchronized (brokerConnectionStates) {
718                state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
719                if (state == null) {
720                    state = new TransportConnectionState(info, this);
721                    brokerConnectionStates.put(info.getConnectionId(), state);
722                }
723                state.incrementReference();
724            }
725            // If there are 2 concurrent connections for the same connection id,
726            // then last one in wins, we need to sync here
727            // to figure out the winner.
728            synchronized (state.getConnectionMutex()) {
729                if (state.getConnection() != this) {
730                    LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
731                    state.getConnection().stop();
732                    LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
733                            + state.getConnection().getRemoteAddress());
734                    state.setConnection(this);
735                    state.reset(info);
736                }
737            }
738            registerConnectionState(info.getConnectionId(), state);
739            LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress() + ", info: " + info);
740            this.faultTolerantConnection = info.isFaultTolerant();
741            // Setup the context.
742            String clientId = info.getClientId();
743            context = new ConnectionContext();
744            context.setBroker(broker);
745            context.setClientId(clientId);
746            context.setClientMaster(info.isClientMaster());
747            context.setConnection(this);
748            context.setConnectionId(info.getConnectionId());
749            context.setConnector(connector);
750            context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
751            context.setNetworkConnection(networkConnection);
752            context.setFaultTolerant(faultTolerantConnection);
753            context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
754            context.setUserName(info.getUserName());
755            context.setWireFormatInfo(wireFormatInfo);
756            context.setReconnect(info.isFailoverReconnect());
757            this.manageable = info.isManageable();
758            context.setConnectionState(state);
759            state.setContext(context);
760            state.setConnection(this);
761            if (info.getClientIp() == null) {
762                info.setClientIp(getRemoteAddress());
763            }
764    
765            try {
766                broker.addConnection(context, info);
767            } catch (Exception e) {
768                synchronized (brokerConnectionStates) {
769                    brokerConnectionStates.remove(info.getConnectionId());
770                }
771                unregisterConnectionState(info.getConnectionId());
772                LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString());
773                if (LOG.isDebugEnabled()) {
774                    LOG.debug("Exception detail:", e);
775                }
776                if (e instanceof SecurityException) {
777                    // close this down - in case the peer of this transport doesn't play nice
778                    delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
779                }
780                throw e;
781            }
782            if (info.isManageable()) {
783                // send ConnectionCommand
784                ConnectionControl command = this.connector.getConnectionControl();
785                command.setFaultTolerant(broker.isFaultTolerantConfiguration());
786                if (info.isFailoverReconnect()) {
787                    command.setRebalanceConnection(false);
788                }
789                dispatchAsync(command);
790            }
791            return null;
792        }
793    
794        @Override
795        public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
796                throws InterruptedException {
797            LOG.debug("remove connection id: " + id);
798            TransportConnectionState cs = lookupConnectionState(id);
799            if (cs != null) {
800                // Don't allow things to be added to the connection state while we
801                // are shutting down.
802                cs.shutdown();
803                // Cascade the connection stop to the sessions.
804                for (SessionId sessionId : cs.getSessionIds()) {
805                    try {
806                        processRemoveSession(sessionId, lastDeliveredSequenceId);
807                    } catch (Throwable e) {
808                        SERVICELOG.warn("Failed to remove session " + sessionId, e);
809                    }
810                }
811                // Cascade the connection stop to temp destinations.
812                for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
813                    DestinationInfo di = iter.next();
814                    try {
815                        broker.removeDestination(cs.getContext(), di.getDestination(), 0);
816                    } catch (Throwable e) {
817                        SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
818                    }
819                    iter.remove();
820                }
821                try {
822                    broker.removeConnection(cs.getContext(), cs.getInfo(), null);
823                } catch (Throwable e) {
824                    SERVICELOG.warn("Failed to remove connection " + cs.getInfo() + ", reason: " + e.toString());
825                    if (LOG.isDebugEnabled()) {
826                        SERVICELOG.debug("Exception detail:", e);
827                    }
828                }
829                TransportConnectionState state = unregisterConnectionState(id);
830                if (state != null) {
831                    synchronized (brokerConnectionStates) {
832                        // If we are the last reference, we should remove the state
833                        // from the broker.
834                        if (state.decrementReference() == 0) {
835                            brokerConnectionStates.remove(id);
836                        }
837                    }
838                }
839            }
840            return null;
841        }
842    
843        @Override
844        public Response processProducerAck(ProducerAck ack) throws Exception {
845            // A broker should not get ProducerAck messages.
846            return null;
847        }
848    
849        @Override
850        public Connector getConnector() {
851            return connector;
852        }
853    
854        @Override
855        public void dispatchSync(Command message) {
856            try {
857                processDispatch(message);
858            } catch (IOException e) {
859                serviceExceptionAsync(e);
860            }
861        }
862    
863        @Override
864        public void dispatchAsync(Command message) {
865            if (!stopping.get()) {
866                if (taskRunner == null) {
867                    dispatchSync(message);
868                } else {
869                    synchronized (dispatchQueue) {
870                        dispatchQueue.add(message);
871                    }
872                    try {
873                        taskRunner.wakeup();
874                    } catch (InterruptedException e) {
875                        Thread.currentThread().interrupt();
876                    }
877                }
878            } else {
879                if (message.isMessageDispatch()) {
880                    MessageDispatch md = (MessageDispatch) message;
881                    TransmitCallback sub = md.getTransmitCallback();
882                    broker.postProcessDispatch(md);
883                    if (sub != null) {
884                        sub.onFailure();
885                    }
886                }
887            }
888        }
889    
890        protected void processDispatch(Command command) throws IOException {
891            MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
892            try {
893                if (!stopping.get()) {
894                    if (messageDispatch != null) {
895                        broker.preProcessDispatch(messageDispatch);
896                    }
897                    dispatch(command);
898                }
899            } catch (IOException e) {
900                if (messageDispatch != null) {
901                    TransmitCallback sub = messageDispatch.getTransmitCallback();
902                    broker.postProcessDispatch(messageDispatch);
903                    if (sub != null) {
904                        sub.onFailure();
905                    }
906                    messageDispatch = null;
907                    throw e;
908                }
909            } finally {
910                if (messageDispatch != null) {
911                    TransmitCallback sub = messageDispatch.getTransmitCallback();
912                    broker.postProcessDispatch(messageDispatch);
913                    if (sub != null) {
914                        sub.onSuccess();
915                    }
916                }
917            }
918        }
919    
920        @Override
921        public boolean iterate() {
922            try {
923                if (pendingStop || stopping.get()) {
924                    if (dispatchStopped.compareAndSet(false, true)) {
925                        if (transportException.get() == null) {
926                            try {
927                                dispatch(new ShutdownInfo());
928                            } catch (Throwable ignore) {
929                            }
930                        }
931                        dispatchStoppedLatch.countDown();
932                    }
933                    return false;
934                }
935                if (!dispatchStopped.get()) {
936                    Command command = null;
937                    synchronized (dispatchQueue) {
938                        if (dispatchQueue.isEmpty()) {
939                            return false;
940                        }
941                        command = dispatchQueue.remove(0);
942                    }
943                    processDispatch(command);
944                    return true;
945                }
946                return false;
947            } catch (IOException e) {
948                if (dispatchStopped.compareAndSet(false, true)) {
949                    dispatchStoppedLatch.countDown();
950                }
951                serviceExceptionAsync(e);
952                return false;
953            }
954        }
955    
956        /**
957         * Returns the statistics for this connection
958         */
959        @Override
960        public ConnectionStatistics getStatistics() {
961            return statistics;
962        }
963    
964        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
965            return messageAuthorizationPolicy;
966        }
967    
968        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
969            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
970        }
971    
972        @Override
973        public boolean isManageable() {
974            return manageable;
975        }
976    
977        @Override
978        public void start() throws Exception {
979            try {
980                synchronized (this) {
981                    starting = true;
982                    if (taskRunnerFactory != null) {
983                        taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
984                                + getRemoteAddress());
985                    } else {
986                        taskRunner = null;
987                    }
988                    transport.start();
989                    active = true;
990                    BrokerInfo info = connector.getBrokerInfo().copy();
991                    if (connector.isUpdateClusterClients()) {
992                        info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
993                    } else {
994                        info.setPeerBrokerInfos(null);
995                    }
996                    dispatchAsync(info);
997    
998                    connector.onStarted(this);
999                }
1000            } catch (Exception e) {
1001                // Force clean up on an error starting up.
1002                pendingStop = true;
1003                throw e;
1004            } finally {
1005                // stop() can be called from within the above block,
1006                // but we want to be sure start() completes before
1007                // stop() runs, so queue the stop until right now:
1008                setStarting(false);
1009                if (isPendingStop()) {
1010                    LOG.debug("Calling the delayed stop() after start() " + this);
1011                    stop();
1012                }
1013            }
1014        }
1015    
1016        @Override
1017        public void stop() throws Exception {
1018            // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
1019            // as their lifecycle is handled elsewhere
1020    
1021            stopAsync();
1022            while (!stopped.await(5, TimeUnit.SECONDS)) {
1023                LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
1024            }
1025        }
1026    
1027        public void delayedStop(final int waitTime, final String reason, Throwable cause) {
1028            if (waitTime > 0) {
1029                synchronized (this) {
1030                    pendingStop = true;
1031                    stopError = cause;
1032                }
1033                try {
1034                    stopTaskRunnerFactory.execute(new Runnable() {
1035                        @Override
1036                        public void run() {
1037                            try {
1038                                Thread.sleep(waitTime);
1039                                stopAsync();
1040                                LOG.info("Stopping " + transport.getRemoteAddress() + " because " + reason);
1041                            } catch (InterruptedException e) {
1042                            }
1043                        }
1044                    });
1045                } catch (Throwable t) {
1046                    LOG.warn("Cannot create stopAsync. This exception will be ignored.", t);
1047                }
1048            }
1049        }
1050    
1051        public void stopAsync() {
1052            // If we're in the middle of starting then go no further... for now.
1053            synchronized (this) {
1054                pendingStop = true;
1055                if (starting) {
1056                    LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
1057                    return;
1058                }
1059            }
1060            if (stopping.compareAndSet(false, true)) {
1061                // Let all the connection contexts know we are shutting down
1062                // so that in progress operations can notice and unblock.
1063                List<TransportConnectionState> connectionStates = listConnectionStates();
1064                for (TransportConnectionState cs : connectionStates) {
1065                    ConnectionContext connectionContext = cs.getContext();
1066                    if (connectionContext != null) {
1067                        connectionContext.getStopping().set(true);
1068                    }
1069                }
1070                try {
1071                    stopTaskRunnerFactory.execute(new Runnable() {
1072                        @Override
1073                        public void run() {
1074                            serviceLock.writeLock().lock();
1075                            try {
1076                                doStop();
1077                            } catch (Throwable e) {
1078                                LOG.debug("Error occurred while shutting down a connection " + this, e);
1079                            } finally {
1080                                stopped.countDown();
1081                                serviceLock.writeLock().unlock();
1082                            }
1083                        }
1084                    });
1085                } catch (Throwable t) {
1086                    LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t);
1087                    stopped.countDown();
1088                }
1089            }
1090        }
1091    
1092        @Override
1093        public String toString() {
1094            return "Transport Connection to: " + transport.getRemoteAddress();
1095        }
1096    
1097        protected void doStop() throws Exception {
1098            LOG.debug("Stopping connection: {}", transport.getRemoteAddress());
1099            connector.onStopped(this);
1100            try {
1101                synchronized (this) {
1102                    if (duplexBridge != null) {
1103                        duplexBridge.stop();
1104                    }
1105                }
1106            } catch (Exception ignore) {
1107                LOG.trace("Exception caught stopping. This exception is ignored.", ignore);
1108            }
1109            try {
1110                transport.stop();
1111                LOG.debug("Stopped transport: " + transport.getRemoteAddress());
1112            } catch (Exception e) {
1113                LOG.debug("Could not stop transport to " + transport.getRemoteAddress() + ". This exception is ignored.", e);
1114            }
1115            if (taskRunner != null) {
1116                taskRunner.shutdown(1);
1117                taskRunner = null;
1118            }
1119            active = false;
1120            // Run the MessageDispatch callbacks so that message references get
1121            // cleaned up.
1122            synchronized (dispatchQueue) {
1123                for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
1124                    Command command = iter.next();
1125                    if (command.isMessageDispatch()) {
1126                        MessageDispatch md = (MessageDispatch) command;
1127                        TransmitCallback sub = md.getTransmitCallback();
1128                        broker.postProcessDispatch(md);
1129                        if (sub != null) {
1130                            sub.onFailure();
1131                        }
1132                    }
1133                }
1134                dispatchQueue.clear();
1135            }
1136            //
1137            // Remove all logical connection associated with this connection
1138            // from the broker.
1139            if (!broker.isStopped()) {
1140                List<TransportConnectionState> connectionStates = listConnectionStates();
1141                connectionStates = listConnectionStates();
1142                for (TransportConnectionState cs : connectionStates) {
1143                    cs.getContext().getStopping().set(true);
1144                    try {
1145                        LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
1146                        processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
1147                    } catch (Throwable ignore) {
1148                        ignore.printStackTrace();
1149                    }
1150                }
1151            }
1152            LOG.debug("Connection Stopped: {}", getRemoteAddress());
1153        }
1154    
1155        /**
1156         * @return Returns the blockedCandidate.
1157         */
1158        public boolean isBlockedCandidate() {
1159            return blockedCandidate;
1160        }
1161    
1162        /**
1163         * @param blockedCandidate The blockedCandidate to set.
1164         */
1165        public void setBlockedCandidate(boolean blockedCandidate) {
1166            this.blockedCandidate = blockedCandidate;
1167        }
1168    
1169        /**
1170         * @return Returns the markedCandidate.
1171         */
1172        public boolean isMarkedCandidate() {
1173            return markedCandidate;
1174        }
1175    
1176        /**
1177         * @param markedCandidate The markedCandidate to set.
1178         */
1179        public void setMarkedCandidate(boolean markedCandidate) {
1180            this.markedCandidate = markedCandidate;
1181            if (!markedCandidate) {
1182                timeStamp = 0;
1183                blockedCandidate = false;
1184            }
1185        }
1186    
1187        /**
1188         * @param slow The slow to set.
1189         */
1190        public void setSlow(boolean slow) {
1191            this.slow = slow;
1192        }
1193    
1194        /**
1195         * @return true if the Connection is slow
1196         */
1197        @Override
1198        public boolean isSlow() {
1199            return slow;
1200        }
1201    
1202        /**
1203         * @return true if the Connection is potentially blocked
1204         */
1205        public boolean isMarkedBlockedCandidate() {
1206            return markedCandidate;
1207        }
1208    
1209        /**
1210         * Mark the Connection, so we can deem if it's collectable on the next sweep
1211         */
1212        public void doMark() {
1213            if (timeStamp == 0) {
1214                timeStamp = System.currentTimeMillis();
1215            }
1216        }
1217    
1218        /**
1219         * @return if after being marked, the Connection is still writing
1220         */
1221        @Override
1222        public boolean isBlocked() {
1223            return blocked;
1224        }
1225    
1226        /**
1227         * @return true if the Connection is connected
1228         */
1229        @Override
1230        public boolean isConnected() {
1231            return connected;
1232        }
1233    
1234        /**
1235         * @param blocked The blocked to set.
1236         */
1237        public void setBlocked(boolean blocked) {
1238            this.blocked = blocked;
1239        }
1240    
1241        /**
1242         * @param connected The connected to set.
1243         */
1244        public void setConnected(boolean connected) {
1245            this.connected = connected;
1246        }
1247    
1248        /**
1249         * @return true if the Connection is active
1250         */
1251        @Override
1252        public boolean isActive() {
1253            return active;
1254        }
1255    
1256        /**
1257         * @param active The active to set.
1258         */
1259        public void setActive(boolean active) {
1260            this.active = active;
1261        }
1262    
1263        /**
1264         * @return true if the Connection is starting
1265         */
1266        public synchronized boolean isStarting() {
1267            return starting;
1268        }
1269    
1270        @Override
1271        public synchronized boolean isNetworkConnection() {
1272            return networkConnection;
1273        }
1274    
1275        @Override
1276        public boolean isFaultTolerantConnection() {
1277            return this.faultTolerantConnection;
1278        }
1279    
1280        protected synchronized void setStarting(boolean starting) {
1281            this.starting = starting;
1282        }
1283    
1284        /**
1285         * @return true if the Connection needs to stop
1286         */
1287        public synchronized boolean isPendingStop() {
1288            return pendingStop;
1289        }
1290    
1291        protected synchronized void setPendingStop(boolean pendingStop) {
1292            this.pendingStop = pendingStop;
1293        }
1294    
1295        @Override
1296        public Response processBrokerInfo(BrokerInfo info) {
1297            if (info.isSlaveBroker()) {
1298                LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: " + info.getBrokerName());
1299            } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1300                // so this TransportConnection is the rear end of a network bridge
1301                // We have been requested to create a two way pipe ...
1302                try {
1303                    Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1304                    Map<String, String> props = createMap(properties);
1305                    NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1306                    IntrospectionSupport.setProperties(config, props, "");
1307                    config.setBrokerName(broker.getBrokerName());
1308    
1309                    // check for existing duplex connection hanging about
1310    
1311                    // We first look if existing network connection already exists for the same broker Id and network connector name
1312                    // It's possible in case of brief network fault to have this transport connector side of the connection always active
1313                    // and the duplex network connector side wanting to open a new one
1314                    // In this case, the old connection must be broken
1315                    String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1316                    CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1317                    synchronized (connections) {
1318                        for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
1319                            TransportConnection c = iter.next();
1320                            if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1321                                LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
1322                                c.stopAsync();
1323                                // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1324                                c.getStopped().await(1, TimeUnit.SECONDS);
1325                            }
1326                        }
1327                        setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1328                    }
1329                    Transport localTransport = NetworkBridgeFactory.createLocalTransport(broker);
1330                    Transport remoteBridgeTransport = transport;
1331                    if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
1332                        // the vm transport case is already wrapped
1333                        remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport);
1334                    }
1335                    String duplexName = localTransport.toString();
1336                    if (duplexName.contains("#")) {
1337                        duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1338                    }
1339                    MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
1340                    listener.setCreatedByDuplex(true);
1341                    duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1342                    duplexBridge.setBrokerService(broker.getBrokerService());
1343                    // now turn duplex off this side
1344                    info.setDuplexConnection(false);
1345                    duplexBridge.setCreatedByDuplex(true);
1346                    duplexBridge.duplexStart(this, brokerInfo, info);
1347                    LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId);
1348                    return null;
1349                } catch (TransportDisposedIOException e) {
1350                    LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
1351                    return null;
1352                } catch (Exception e) {
1353                    LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId, e);
1354                    return null;
1355                }
1356            }
1357            // We only expect to get one broker info command per connection
1358            if (this.brokerInfo != null) {
1359                LOG.warn("Unexpected extra broker info command received: " + info);
1360            }
1361            this.brokerInfo = info;
1362            networkConnection = true;
1363            List<TransportConnectionState> connectionStates = listConnectionStates();
1364            for (TransportConnectionState cs : connectionStates) {
1365                cs.getContext().setNetworkConnection(true);
1366            }
1367            return null;
1368        }
1369    
1370        @SuppressWarnings({"unchecked", "rawtypes"})
1371        private HashMap<String, String> createMap(Properties properties) {
1372            return new HashMap(properties);
1373        }
1374    
1375        protected void dispatch(Command command) throws IOException {
1376            try {
1377                setMarkedCandidate(true);
1378                transport.oneway(command);
1379            } finally {
1380                setMarkedCandidate(false);
1381            }
1382        }
1383    
1384        @Override
1385        public String getRemoteAddress() {
1386            return transport.getRemoteAddress();
1387        }
1388    
1389        @Override
1390        public String getConnectionId() {
1391            List<TransportConnectionState> connectionStates = listConnectionStates();
1392            for (TransportConnectionState cs : connectionStates) {
1393                if (cs.getInfo().getClientId() != null) {
1394                    return cs.getInfo().getClientId();
1395                }
1396                return cs.getInfo().getConnectionId().toString();
1397            }
1398            return null;
1399        }
1400    
1401        @Override
1402        public void updateClient(ConnectionControl control) {
1403            if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1404                    && this.wireFormatInfo.getVersion() >= 6) {
1405                dispatchAsync(control);
1406            }
1407        }
1408    
1409        private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1410            ProducerBrokerExchange result = producerExchanges.get(id);
1411            if (result == null) {
1412                synchronized (producerExchanges) {
1413                    result = new ProducerBrokerExchange();
1414                    TransportConnectionState state = lookupConnectionState(id);
1415                    context = state.getContext();
1416                    result.setConnectionContext(context);
1417                    if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
1418                        result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
1419                    }
1420                    SessionState ss = state.getSessionState(id.getParentId());
1421                    if (ss != null) {
1422                        result.setProducerState(ss.getProducerState(id));
1423                        ProducerState producerState = ss.getProducerState(id);
1424                        if (producerState != null && producerState.getInfo() != null) {
1425                            ProducerInfo info = producerState.getInfo();
1426                            result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1427                        }
1428                    }
1429                    producerExchanges.put(id, result);
1430                }
1431            } else {
1432                context = result.getConnectionContext();
1433            }
1434            return result;
1435        }
1436    
1437        private void removeProducerBrokerExchange(ProducerId id) {
1438            synchronized (producerExchanges) {
1439                producerExchanges.remove(id);
1440            }
1441        }
1442    
1443        private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1444            ConsumerBrokerExchange result = consumerExchanges.get(id);
1445            return result;
1446        }
1447    
1448        private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) {
1449            ConsumerBrokerExchange result = consumerExchanges.get(id);
1450            if (result == null) {
1451                synchronized (consumerExchanges) {
1452                    result = new ConsumerBrokerExchange();
1453                    TransportConnectionState state = lookupConnectionState(id);
1454                    context = state.getContext();
1455                    result.setConnectionContext(context);
1456                    SessionState ss = state.getSessionState(id.getParentId());
1457                    if (ss != null) {
1458                        ConsumerState cs = ss.getConsumerState(id);
1459                        if (cs != null) {
1460                            ConsumerInfo info = cs.getInfo();
1461                            if (info != null) {
1462                                if (info.getDestination() != null && info.getDestination().isPattern()) {
1463                                    result.setWildcard(true);
1464                                }
1465                            }
1466                        }
1467                    }
1468                    consumerExchanges.put(id, result);
1469                }
1470            }
1471            return result;
1472        }
1473    
1474        private void removeConsumerBrokerExchange(ConsumerId id) {
1475            synchronized (consumerExchanges) {
1476                consumerExchanges.remove(id);
1477            }
1478        }
1479    
1480        public int getProtocolVersion() {
1481            return protocolVersion.get();
1482        }
1483    
1484        @Override
1485        public Response processControlCommand(ControlCommand command) throws Exception {
1486            String control = command.getCommand();
1487            if (control != null && control.equals("shutdown")) {
1488                System.exit(0);
1489            }
1490            return null;
1491        }
1492    
1493        @Override
1494        public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1495            return null;
1496        }
1497    
1498        @Override
1499        public Response processConnectionControl(ConnectionControl control) throws Exception {
1500            if (control != null) {
1501                faultTolerantConnection = control.isFaultTolerant();
1502            }
1503            return null;
1504        }
1505    
1506        @Override
1507        public Response processConnectionError(ConnectionError error) throws Exception {
1508            return null;
1509        }
1510    
1511        @Override
1512        public Response processConsumerControl(ConsumerControl control) throws Exception {
1513            ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1514            broker.processConsumerControl(consumerExchange, control);
1515            return null;
1516        }
1517    
1518        protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1519                                                                                TransportConnectionState state) {
1520            TransportConnectionState cs = null;
1521            if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1522                // swap implementations
1523                TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1524                newRegister.intialize(connectionStateRegister);
1525                connectionStateRegister = newRegister;
1526            }
1527            cs = connectionStateRegister.registerConnectionState(connectionId, state);
1528            return cs;
1529        }
1530    
1531        protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1532            return connectionStateRegister.unregisterConnectionState(connectionId);
1533        }
1534    
1535        protected synchronized List<TransportConnectionState> listConnectionStates() {
1536            return connectionStateRegister.listConnectionStates();
1537        }
1538    
1539        protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1540            return connectionStateRegister.lookupConnectionState(connectionId);
1541        }
1542    
1543        protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1544            return connectionStateRegister.lookupConnectionState(id);
1545        }
1546    
1547        protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1548            return connectionStateRegister.lookupConnectionState(id);
1549        }
1550    
1551        protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1552            return connectionStateRegister.lookupConnectionState(id);
1553        }
1554    
1555        protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1556            return connectionStateRegister.lookupConnectionState(connectionId);
1557        }
1558    
1559        protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1560            this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1561        }
1562    
1563        protected synchronized String getDuplexNetworkConnectorId() {
1564            return this.duplexNetworkConnectorId;
1565        }
1566    
1567        public boolean isStopping() {
1568            return stopping.get();
1569        }
1570    
1571        protected CountDownLatch getStopped() {
1572            return stopped;
1573        }
1574    
1575        private int getProducerCount(ConnectionId connectionId) {
1576            int result = 0;
1577            TransportConnectionState cs = lookupConnectionState(connectionId);
1578            if (cs != null) {
1579                for (SessionId sessionId : cs.getSessionIds()) {
1580                    SessionState sessionState = cs.getSessionState(sessionId);
1581                    if (sessionState != null) {
1582                        result += sessionState.getProducerIds().size();
1583                    }
1584                }
1585            }
1586            return result;
1587        }
1588    
1589        private int getConsumerCount(ConnectionId connectionId) {
1590            int result = 0;
1591            TransportConnectionState cs = lookupConnectionState(connectionId);
1592            if (cs != null) {
1593                for (SessionId sessionId : cs.getSessionIds()) {
1594                    SessionState sessionState = cs.getSessionState(sessionId);
1595                    if (sessionState != null) {
1596                        result += sessionState.getConsumerIds().size();
1597                    }
1598                }
1599            }
1600            return result;
1601        }
1602    }