001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import java.io.EOFException;
020 import java.io.IOException;
021 import java.net.SocketException;
022 import java.net.URI;
023 import java.util.HashMap;
024 import java.util.Iterator;
025 import java.util.LinkedList;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Properties;
029 import java.util.concurrent.ConcurrentHashMap;
030 import java.util.concurrent.CopyOnWriteArrayList;
031 import java.util.concurrent.CountDownLatch;
032 import java.util.concurrent.TimeUnit;
033 import java.util.concurrent.atomic.AtomicBoolean;
034 import java.util.concurrent.atomic.AtomicInteger;
035 import java.util.concurrent.atomic.AtomicReference;
036 import java.util.concurrent.locks.ReentrantReadWriteLock;
037
038 import javax.transaction.xa.XAResource;
039 import org.apache.activemq.advisory.AdvisorySupport;
040 import org.apache.activemq.broker.ft.MasterBroker;
041 import org.apache.activemq.broker.region.ConnectionStatistics;
042 import org.apache.activemq.broker.region.RegionBroker;
043 import org.apache.activemq.command.*;
044 import org.apache.activemq.network.DemandForwardingBridge;
045 import org.apache.activemq.network.MBeanNetworkListener;
046 import org.apache.activemq.network.NetworkBridgeConfiguration;
047 import org.apache.activemq.network.NetworkBridgeFactory;
048 import org.apache.activemq.security.MessageAuthorizationPolicy;
049 import org.apache.activemq.state.CommandVisitor;
050 import org.apache.activemq.state.ConnectionState;
051 import org.apache.activemq.state.ConsumerState;
052 import org.apache.activemq.state.ProducerState;
053 import org.apache.activemq.state.SessionState;
054 import org.apache.activemq.state.TransactionState;
055 import org.apache.activemq.thread.Task;
056 import org.apache.activemq.thread.TaskRunner;
057 import org.apache.activemq.thread.TaskRunnerFactory;
058 import org.apache.activemq.transaction.Transaction;
059 import org.apache.activemq.transport.DefaultTransportListener;
060 import org.apache.activemq.transport.ResponseCorrelator;
061 import org.apache.activemq.transport.Transport;
062 import org.apache.activemq.transport.TransportDisposedIOException;
063 import org.apache.activemq.transport.TransportFactory;
064 import org.apache.activemq.util.IntrospectionSupport;
065 import org.apache.activemq.util.MarshallingSupport;
066 import org.apache.activemq.util.ServiceSupport;
067 import org.apache.activemq.util.URISupport;
068 import org.slf4j.Logger;
069 import org.slf4j.LoggerFactory;
070 import org.slf4j.MDC;
071
072 public class TransportConnection implements Connection, Task, CommandVisitor {
073 private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
074 private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
075 private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service");
076 // Keeps track of the broker and connector that created this connection.
077 protected final Broker broker;
078 protected final TransportConnector connector;
079 // Keeps track of the state of the connections.
080 // protected final ConcurrentHashMap localConnectionStates=new
081 // ConcurrentHashMap();
082 protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
083 // The broker and wireformat info that was exchanged.
084 protected BrokerInfo brokerInfo;
085 protected final List<Command> dispatchQueue = new LinkedList<Command>();
086 protected TaskRunner taskRunner;
087 protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
088 protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
089 private MasterBroker masterBroker;
090 private final Transport transport;
091 private MessageAuthorizationPolicy messageAuthorizationPolicy;
092 private WireFormatInfo wireFormatInfo;
093 // Used to do async dispatch.. this should perhaps be pushed down into the
094 // transport layer..
095 private boolean inServiceException;
096 private final ConnectionStatistics statistics = new ConnectionStatistics();
097 private boolean manageable;
098 private boolean slow;
099 private boolean markedCandidate;
100 private boolean blockedCandidate;
101 private boolean blocked;
102 private boolean connected;
103 private boolean active;
104 private boolean starting;
105 private boolean pendingStop;
106 private long timeStamp;
107 private final AtomicBoolean stopping = new AtomicBoolean(false);
108 private final CountDownLatch stopped = new CountDownLatch(1);
109 private final AtomicBoolean asyncException = new AtomicBoolean(false);
110 private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
111 private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
112 private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
113 private ConnectionContext context;
114 private boolean networkConnection;
115 private boolean faultTolerantConnection;
116 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
117 private DemandForwardingBridge duplexBridge;
118 private final TaskRunnerFactory taskRunnerFactory;
119 private final TaskRunnerFactory stopTaskRunnerFactory;
120 private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
121 private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
122 private String duplexNetworkConnectorId;
123 private Throwable stopError = null;
124
125 /**
126 * @param taskRunnerFactory - can be null if you want direct dispatch to the transport
127 * else commands are sent async.
128 * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection.
129 */
130 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker,
131 TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) {
132 this.connector = connector;
133 this.broker = broker;
134 RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
135 brokerConnectionStates = rb.getConnectionStates();
136 if (connector != null) {
137 this.statistics.setParent(connector.getStatistics());
138 this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy();
139 }
140 this.taskRunnerFactory = taskRunnerFactory;
141 this.stopTaskRunnerFactory = stopTaskRunnerFactory;
142 this.transport = transport;
143 this.transport.setTransportListener(new DefaultTransportListener() {
144 @Override
145 public void onCommand(Object o) {
146 serviceLock.readLock().lock();
147 try {
148 if (!(o instanceof Command)) {
149 throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
150 }
151 Command command = (Command) o;
152 Response response = service(command);
153 if (response != null) {
154 dispatchSync(response);
155 }
156 } finally {
157 serviceLock.readLock().unlock();
158 }
159 }
160
161 @Override
162 public void onException(IOException exception) {
163 serviceLock.readLock().lock();
164 try {
165 serviceTransportException(exception);
166 } finally {
167 serviceLock.readLock().unlock();
168 }
169 }
170 });
171 connected = true;
172 }
173
174 /**
175 * Returns the number of messages to be dispatched to this connection
176 *
177 * @return size of dispatch queue
178 */
179 public int getDispatchQueueSize() {
180 synchronized (dispatchQueue) {
181 return dispatchQueue.size();
182 }
183 }
184
185 public void serviceTransportException(IOException e) {
186 BrokerService bService = connector.getBrokerService();
187 if (bService.isShutdownOnSlaveFailure()) {
188 if (brokerInfo != null) {
189 if (brokerInfo.isSlaveBroker()) {
190 LOG.error("Slave has exception: " + e.getMessage() + " shutting down master now.", e);
191 try {
192 doStop();
193 bService.stop();
194 } catch (Exception ex) {
195 LOG.warn("Failed to stop the master", ex);
196 }
197 }
198 }
199 }
200 if (!stopping.get()) {
201 transportException.set(e);
202 if (TRANSPORTLOG.isDebugEnabled()) {
203 TRANSPORTLOG.debug(this + " failed: " + e, e);
204 } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
205 TRANSPORTLOG.warn(this + " failed: " + e);
206 }
207 stopAsync();
208 }
209 }
210
211 private boolean expected(IOException e) {
212 return isStomp() &&
213 ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException);
214 }
215
216 private boolean isStomp() {
217 URI uri = connector.getUri();
218 return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
219 }
220
221 /**
222 * Calls the serviceException method in an async thread. Since handling a
223 * service exception closes a socket, we should not tie up broker threads
224 * since client sockets may hang or cause deadlocks.
225 */
226 public void serviceExceptionAsync(final IOException e) {
227 if (asyncException.compareAndSet(false, true)) {
228 new Thread("Async Exception Handler") {
229 @Override
230 public void run() {
231 serviceException(e);
232 }
233 }.start();
234 }
235 }
236
237 /**
238 * Closes a clients connection due to a detected error. Errors are ignored
239 * if: the client is closing or broker is closing. Otherwise, the connection
240 * error transmitted to the client before stopping it's transport.
241 */
242 public void serviceException(Throwable e) {
243 // are we a transport exception such as not being able to dispatch
244 // synchronously to a transport
245 if (e instanceof IOException) {
246 serviceTransportException((IOException) e);
247 } else if (e.getClass() == BrokerStoppedException.class) {
248 // Handle the case where the broker is stopped
249 // But the client is still connected.
250 if (!stopping.get()) {
251 if (SERVICELOG.isDebugEnabled()) {
252 SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection.");
253 }
254 ConnectionError ce = new ConnectionError();
255 ce.setException(e);
256 dispatchSync(ce);
257 // Record the error that caused the transport to stop
258 this.stopError = e;
259 // Wait a little bit to try to get the output buffer to flush
260 // the exption notification to the client.
261 try {
262 Thread.sleep(500);
263 } catch (InterruptedException ie) {
264 Thread.currentThread().interrupt();
265 }
266 // Worst case is we just kill the connection before the
267 // notification gets to him.
268 stopAsync();
269 }
270 } else if (!stopping.get() && !inServiceException) {
271 inServiceException = true;
272 try {
273 SERVICELOG.warn("Async error occurred: " + e, e);
274 ConnectionError ce = new ConnectionError();
275 ce.setException(e);
276 if (pendingStop) {
277 dispatchSync(ce);
278 } else {
279 dispatchAsync(ce);
280 }
281 } finally {
282 inServiceException = false;
283 }
284 }
285 }
286
287 public Response service(Command command) {
288 MDC.put("activemq.connector", connector.getUri().toString());
289 Response response = null;
290 boolean responseRequired = command.isResponseRequired();
291 int commandId = command.getCommandId();
292 try {
293 if (!pendingStop) {
294 response = command.visit(this);
295 } else {
296 response = new ExceptionResponse(this.stopError);
297 }
298 } catch (Throwable e) {
299 if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
300 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async")
301 + " command: " + command + ", exception: " + e, e);
302 }
303
304 if (responseRequired) {
305 response = new ExceptionResponse(e);
306 } else {
307 serviceException(e);
308 }
309 }
310 if (responseRequired) {
311 if (response == null) {
312 response = new Response();
313 }
314 response.setCorrelationId(commandId);
315 }
316 // The context may have been flagged so that the response is not
317 // sent.
318 if (context != null) {
319 if (context.isDontSendReponse()) {
320 context.setDontSendReponse(false);
321 response = null;
322 }
323 context = null;
324 }
325 MDC.remove("activemq.connector");
326 return response;
327 }
328
329 public Response processKeepAlive(KeepAliveInfo info) throws Exception {
330 return null;
331 }
332
333 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception {
334 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info);
335 return null;
336 }
337
338 public Response processWireFormat(WireFormatInfo info) throws Exception {
339 wireFormatInfo = info;
340 protocolVersion.set(info.getVersion());
341 return null;
342 }
343
344 public Response processShutdown(ShutdownInfo info) throws Exception {
345 stopAsync();
346 return null;
347 }
348
349 public Response processFlush(FlushCommand command) throws Exception {
350 return null;
351 }
352
353 public Response processBeginTransaction(TransactionInfo info) throws Exception {
354 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
355 context = null;
356 if (cs != null) {
357 context = cs.getContext();
358 }
359 if (cs == null) {
360 throw new NullPointerException("Context is null");
361 }
362 // Avoid replaying dup commands
363 if (cs.getTransactionState(info.getTransactionId()) == null) {
364 cs.addTransactionState(info.getTransactionId());
365 broker.beginTransaction(context, info.getTransactionId());
366 }
367 return null;
368 }
369
370 public Response processEndTransaction(TransactionInfo info) throws Exception {
371 // No need to do anything. This packet is just sent by the client
372 // make sure he is synced with the server as commit command could
373 // come from a different connection.
374 return null;
375 }
376
377 public Response processPrepareTransaction(TransactionInfo info) throws Exception {
378 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
379 context = null;
380 if (cs != null) {
381 context = cs.getContext();
382 }
383 if (cs == null) {
384 throw new NullPointerException("Context is null");
385 }
386 TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
387 if (transactionState == null) {
388 throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: "
389 + info.getTransactionId());
390 }
391 // Avoid dups.
392 if (!transactionState.isPrepared()) {
393 transactionState.setPrepared(true);
394 int result = broker.prepareTransaction(context, info.getTransactionId());
395 transactionState.setPreparedResult(result);
396 if (result == XAResource.XA_RDONLY) {
397 // we are done, no further rollback or commit from TM
398 cs.removeTransactionState(info.getTransactionId());
399 }
400 IntegerResponse response = new IntegerResponse(result);
401 return response;
402 } else {
403 IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
404 return response;
405 }
406 }
407
408 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
409 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
410 context = cs.getContext();
411 cs.removeTransactionState(info.getTransactionId());
412 broker.commitTransaction(context, info.getTransactionId(), true);
413 return null;
414 }
415
416 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
417 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
418 context = cs.getContext();
419 cs.removeTransactionState(info.getTransactionId());
420 broker.commitTransaction(context, info.getTransactionId(), false);
421 return null;
422 }
423
424 public Response processRollbackTransaction(TransactionInfo info) throws Exception {
425 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
426 context = cs.getContext();
427 cs.removeTransactionState(info.getTransactionId());
428 broker.rollbackTransaction(context, info.getTransactionId());
429 return null;
430 }
431
432 public Response processForgetTransaction(TransactionInfo info) throws Exception {
433 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
434 context = cs.getContext();
435 broker.forgetTransaction(context, info.getTransactionId());
436 return null;
437 }
438
439 public Response processRecoverTransactions(TransactionInfo info) throws Exception {
440 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
441 context = cs.getContext();
442 TransactionId[] preparedTransactions = broker.getPreparedTransactions(context);
443 return new DataArrayResponse(preparedTransactions);
444 }
445
446 public Response processMessage(Message messageSend) throws Exception {
447 ProducerId producerId = messageSend.getProducerId();
448 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
449 if (producerExchange.canDispatch(messageSend)) {
450 broker.send(producerExchange, messageSend);
451 }
452 return null;
453 }
454
455 public Response processMessageAck(MessageAck ack) throws Exception {
456 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId());
457 if (consumerExchange != null) {
458 broker.acknowledge(consumerExchange, ack);
459 }
460 return null;
461 }
462
463 public Response processMessagePull(MessagePull pull) throws Exception {
464 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
465 }
466
467 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception {
468 broker.processDispatchNotification(notification);
469 return null;
470 }
471
472 public Response processAddDestination(DestinationInfo info) throws Exception {
473 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
474 broker.addDestinationInfo(cs.getContext(), info);
475 if (info.getDestination().isTemporary()) {
476 cs.addTempDestination(info);
477 }
478 return null;
479 }
480
481 public Response processRemoveDestination(DestinationInfo info) throws Exception {
482 TransportConnectionState cs = lookupConnectionState(info.getConnectionId());
483 broker.removeDestinationInfo(cs.getContext(), info);
484 if (info.getDestination().isTemporary()) {
485 cs.removeTempDestination(info.getDestination());
486 }
487 return null;
488 }
489
490 public Response processAddProducer(ProducerInfo info) throws Exception {
491 SessionId sessionId = info.getProducerId().getParentId();
492 ConnectionId connectionId = sessionId.getParentId();
493 TransportConnectionState cs = lookupConnectionState(connectionId);
494 if (cs == null) {
495 throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
496 + connectionId);
497 }
498 SessionState ss = cs.getSessionState(sessionId);
499 if (ss == null) {
500 throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
501 + sessionId);
502 }
503 // Avoid replaying dup commands
504 if (!ss.getProducerIds().contains(info.getProducerId())) {
505 ActiveMQDestination destination = info.getDestination();
506 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
507 if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
508 throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
509 }
510 }
511 broker.addProducer(cs.getContext(), info);
512 try {
513 ss.addProducer(info);
514 } catch (IllegalStateException e) {
515 broker.removeProducer(cs.getContext(), info);
516 }
517
518 }
519 return null;
520 }
521
522 public Response processRemoveProducer(ProducerId id) throws Exception {
523 SessionId sessionId = id.getParentId();
524 ConnectionId connectionId = sessionId.getParentId();
525 TransportConnectionState cs = lookupConnectionState(connectionId);
526 SessionState ss = cs.getSessionState(sessionId);
527 if (ss == null) {
528 throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: "
529 + sessionId);
530 }
531 ProducerState ps = ss.removeProducer(id);
532 if (ps == null) {
533 throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
534 }
535 removeProducerBrokerExchange(id);
536 broker.removeProducer(cs.getContext(), ps.getInfo());
537 return null;
538 }
539
540 public Response processAddConsumer(ConsumerInfo info) throws Exception {
541 SessionId sessionId = info.getConsumerId().getParentId();
542 ConnectionId connectionId = sessionId.getParentId();
543 TransportConnectionState cs = lookupConnectionState(connectionId);
544 if (cs == null) {
545 throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: "
546 + connectionId);
547 }
548 SessionState ss = cs.getSessionState(sessionId);
549 if (ss == null) {
550 throw new IllegalStateException(broker.getBrokerName()
551 + " Cannot add a consumer to a session that had not been registered: " + sessionId);
552 }
553 // Avoid replaying dup commands
554 if (!ss.getConsumerIds().contains(info.getConsumerId())) {
555 ActiveMQDestination destination = info.getDestination();
556 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
557 if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){
558 throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection());
559 }
560 }
561
562 broker.addConsumer(cs.getContext(), info);
563 try {
564 ss.addConsumer(info);
565 addConsumerBrokerExchange(info.getConsumerId());
566 } catch (IllegalStateException e) {
567 broker.removeConsumer(cs.getContext(), info);
568 }
569
570 }
571 return null;
572 }
573
574 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception {
575 SessionId sessionId = id.getParentId();
576 ConnectionId connectionId = sessionId.getParentId();
577 TransportConnectionState cs = lookupConnectionState(connectionId);
578 if (cs == null) {
579 throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: "
580 + connectionId);
581 }
582 SessionState ss = cs.getSessionState(sessionId);
583 if (ss == null) {
584 throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: "
585 + sessionId);
586 }
587 ConsumerState consumerState = ss.removeConsumer(id);
588 if (consumerState == null) {
589 throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
590 }
591 ConsumerInfo info = consumerState.getInfo();
592 info.setLastDeliveredSequenceId(lastDeliveredSequenceId);
593 broker.removeConsumer(cs.getContext(), consumerState.getInfo());
594 removeConsumerBrokerExchange(id);
595 return null;
596 }
597
598 public Response processAddSession(SessionInfo info) throws Exception {
599 ConnectionId connectionId = info.getSessionId().getParentId();
600 TransportConnectionState cs = lookupConnectionState(connectionId);
601 // Avoid replaying dup commands
602 if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) {
603 broker.addSession(cs.getContext(), info);
604 try {
605 cs.addSession(info);
606 } catch (IllegalStateException e) {
607 e.printStackTrace();
608 broker.removeSession(cs.getContext(), info);
609 }
610 }
611 return null;
612 }
613
614 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception {
615 ConnectionId connectionId = id.getParentId();
616 TransportConnectionState cs = lookupConnectionState(connectionId);
617 if (cs == null) {
618 throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId);
619 }
620 SessionState session = cs.getSessionState(id);
621 if (session == null) {
622 throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
623 }
624 // Don't let new consumers or producers get added while we are closing
625 // this down.
626 session.shutdown();
627 // Cascade the connection stop to the consumers and producers.
628 for (ConsumerId consumerId : session.getConsumerIds()) {
629 try {
630 processRemoveConsumer(consumerId, lastDeliveredSequenceId);
631 } catch (Throwable e) {
632 LOG.warn("Failed to remove consumer: " + consumerId + ". Reason: " + e, e);
633 }
634 }
635 for (ProducerId producerId : session.getProducerIds()) {
636 try {
637 processRemoveProducer(producerId);
638 } catch (Throwable e) {
639 LOG.warn("Failed to remove producer: " + producerId + ". Reason: " + e, e);
640 }
641 }
642 cs.removeSession(id);
643 broker.removeSession(cs.getContext(), session.getInfo());
644 return null;
645 }
646
647 public Response processAddConnection(ConnectionInfo info) throws Exception {
648 // if the broker service has slave attached, wait for the slave to be
649 // attached to allow client connection. slave connection is fine
650 if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
651 && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
652 ServiceSupport.dispose(transport);
653 return new ExceptionResponse(new Exception("Master's slave not attached yet."));
654 }
655 // Older clients should have been defaulting this field to true.. but
656 // they were not.
657 if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
658 info.setClientMaster(true);
659 }
660 TransportConnectionState state;
661 // Make sure 2 concurrent connections by the same ID only generate 1
662 // TransportConnectionState object.
663 synchronized (brokerConnectionStates) {
664 state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId());
665 if (state == null) {
666 state = new TransportConnectionState(info, this);
667 brokerConnectionStates.put(info.getConnectionId(), state);
668 }
669 state.incrementReference();
670 }
671 // If there are 2 concurrent connections for the same connection id,
672 // then last one in wins, we need to sync here
673 // to figure out the winner.
674 synchronized (state.getConnectionMutex()) {
675 if (state.getConnection() != this) {
676 LOG.debug("Killing previous stale connection: " + state.getConnection().getRemoteAddress());
677 state.getConnection().stop();
678 LOG.debug("Connection " + getRemoteAddress() + " taking over previous connection: "
679 + state.getConnection().getRemoteAddress());
680 state.setConnection(this);
681 state.reset(info);
682 }
683 }
684 registerConnectionState(info.getConnectionId(), state);
685 LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress() + ", info: " + info);
686 this.faultTolerantConnection = info.isFaultTolerant();
687 // Setup the context.
688 String clientId = info.getClientId();
689 context = new ConnectionContext();
690 context.setBroker(broker);
691 context.setClientId(clientId);
692 context.setClientMaster(info.isClientMaster());
693 context.setConnection(this);
694 context.setConnectionId(info.getConnectionId());
695 context.setConnector(connector);
696 context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy());
697 context.setNetworkConnection(networkConnection);
698 context.setFaultTolerant(faultTolerantConnection);
699 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>());
700 context.setUserName(info.getUserName());
701 context.setWireFormatInfo(wireFormatInfo);
702 context.setReconnect(info.isFailoverReconnect());
703 this.manageable = info.isManageable();
704 context.setConnectionState(state);
705 state.setContext(context);
706 state.setConnection(this);
707 if (info.getClientIp() == null) {
708 info.setClientIp(getRemoteAddress());
709 }
710
711 try {
712 broker.addConnection(context, info);
713 } catch (Exception e) {
714 synchronized (brokerConnectionStates) {
715 brokerConnectionStates.remove(info.getConnectionId());
716 }
717 unregisterConnectionState(info.getConnectionId());
718 LOG.warn("Failed to add Connection " + info.getConnectionId() + ", reason: " + e.toString());
719 if (LOG.isDebugEnabled()) {
720 LOG.debug("Exception detail:", e);
721 }
722 if (e instanceof SecurityException) {
723 // close this down - in case the peer of this transport doesn't play nice
724 delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e);
725 }
726 throw e;
727 }
728 if (info.isManageable()) {
729 // send ConnectionCommand
730 ConnectionControl command = this.connector.getConnectionControl();
731 command.setFaultTolerant(broker.isFaultTolerantConfiguration());
732 if (info.isFailoverReconnect()) {
733 command.setRebalanceConnection(false);
734 }
735 dispatchAsync(command);
736 }
737 return null;
738 }
739
740 public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId)
741 throws InterruptedException {
742 LOG.debug("remove connection id: " + id);
743 TransportConnectionState cs = lookupConnectionState(id);
744 if (cs != null) {
745 // Don't allow things to be added to the connection state while we
746 // are shutting down.
747 cs.shutdown();
748 // Cascade the connection stop to the sessions.
749 for (SessionId sessionId : cs.getSessionIds()) {
750 try {
751 processRemoveSession(sessionId, lastDeliveredSequenceId);
752 } catch (Throwable e) {
753 SERVICELOG.warn("Failed to remove session " + sessionId, e);
754 }
755 }
756 // Cascade the connection stop to temp destinations.
757 for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) {
758 DestinationInfo di = iter.next();
759 try {
760 broker.removeDestination(cs.getContext(), di.getDestination(), 0);
761 } catch (Throwable e) {
762 SERVICELOG.warn("Failed to remove tmp destination " + di.getDestination(), e);
763 }
764 iter.remove();
765 }
766 try {
767 broker.removeConnection(cs.getContext(), cs.getInfo(), null);
768 } catch (Throwable e) {
769 SERVICELOG.warn("Failed to remove connection " + cs.getInfo() + ", reason: " + e.toString());
770 if (LOG.isDebugEnabled()) {
771 SERVICELOG.debug("Exception detail:", e);
772 }
773 }
774 TransportConnectionState state = unregisterConnectionState(id);
775 if (state != null) {
776 synchronized (brokerConnectionStates) {
777 // If we are the last reference, we should remove the state
778 // from the broker.
779 if (state.decrementReference() == 0) {
780 brokerConnectionStates.remove(id);
781 }
782 }
783 }
784 }
785 return null;
786 }
787
788 public Response processProducerAck(ProducerAck ack) throws Exception {
789 // A broker should not get ProducerAck messages.
790 return null;
791 }
792
793 public Connector getConnector() {
794 return connector;
795 }
796
797 public void dispatchSync(Command message) {
798 try {
799 processDispatch(message);
800 } catch (IOException e) {
801 serviceExceptionAsync(e);
802 }
803 }
804
805 public void dispatchAsync(Command message) {
806 if (!stopping.get()) {
807 if (taskRunner == null) {
808 dispatchSync(message);
809 } else {
810 synchronized (dispatchQueue) {
811 dispatchQueue.add(message);
812 }
813 try {
814 taskRunner.wakeup();
815 } catch (InterruptedException e) {
816 Thread.currentThread().interrupt();
817 }
818 }
819 } else {
820 if (message.isMessageDispatch()) {
821 MessageDispatch md = (MessageDispatch) message;
822 Runnable sub = md.getTransmitCallback();
823 broker.postProcessDispatch(md);
824 if (sub != null) {
825 sub.run();
826 }
827 }
828 }
829 }
830
831 protected void processDispatch(Command command) throws IOException {
832 final MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null);
833 try {
834 if (!stopping.get()) {
835 if (messageDispatch != null) {
836 broker.preProcessDispatch(messageDispatch);
837 }
838 dispatch(command);
839 }
840 } finally {
841 if (messageDispatch != null) {
842 Runnable sub = messageDispatch.getTransmitCallback();
843 broker.postProcessDispatch(messageDispatch);
844 if (sub != null) {
845 sub.run();
846 }
847 }
848 }
849 }
850
851 public boolean iterate() {
852 try {
853 if (pendingStop || stopping.get()) {
854 if (dispatchStopped.compareAndSet(false, true)) {
855 if (transportException.get() == null) {
856 try {
857 dispatch(new ShutdownInfo());
858 } catch (Throwable ignore) {
859 }
860 }
861 dispatchStoppedLatch.countDown();
862 }
863 return false;
864 }
865 if (!dispatchStopped.get()) {
866 Command command = null;
867 synchronized (dispatchQueue) {
868 if (dispatchQueue.isEmpty()) {
869 return false;
870 }
871 command = dispatchQueue.remove(0);
872 }
873 processDispatch(command);
874 return true;
875 }
876 return false;
877 } catch (IOException e) {
878 if (dispatchStopped.compareAndSet(false, true)) {
879 dispatchStoppedLatch.countDown();
880 }
881 serviceExceptionAsync(e);
882 return false;
883 }
884 }
885
886 /**
887 * Returns the statistics for this connection
888 */
889 public ConnectionStatistics getStatistics() {
890 return statistics;
891 }
892
893 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
894 return messageAuthorizationPolicy;
895 }
896
897 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
898 this.messageAuthorizationPolicy = messageAuthorizationPolicy;
899 }
900
901 public boolean isManageable() {
902 return manageable;
903 }
904
905 public void start() throws Exception {
906 try {
907 synchronized (this) {
908 starting = true;
909 if (taskRunnerFactory != null) {
910 taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
911 + getRemoteAddress());
912 } else {
913 taskRunner = null;
914 }
915 transport.start();
916 active = true;
917 BrokerInfo info = connector.getBrokerInfo().copy();
918 if (connector.isUpdateClusterClients()) {
919 info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
920 } else {
921 info.setPeerBrokerInfos(null);
922 }
923 dispatchAsync(info);
924
925 connector.onStarted(this);
926 }
927 } catch (Exception e) {
928 // Force clean up on an error starting up.
929 pendingStop = true;
930 throw e;
931 } finally {
932 // stop() can be called from within the above block,
933 // but we want to be sure start() completes before
934 // stop() runs, so queue the stop until right now:
935 setStarting(false);
936 if (isPendingStop()) {
937 LOG.debug("Calling the delayed stop() after start() " + this);
938 stop();
939 }
940 }
941 }
942
943 public void stop() throws Exception {
944 // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory)
945 // as their lifecycle is handled elsewhere
946
947 stopAsync();
948 while (!stopped.await(5, TimeUnit.SECONDS)) {
949 LOG.info("The connection to '" + transport.getRemoteAddress() + "' is taking a long time to shutdown.");
950 }
951 }
952
953 public void delayedStop(final int waitTime, final String reason, Throwable cause) {
954 if (waitTime > 0) {
955 synchronized (this) {
956 pendingStop = true;
957 stopError = cause;
958 }
959 try {
960 stopTaskRunnerFactory.execute(new Runnable() {
961 public void run() {
962 try {
963 Thread.sleep(waitTime);
964 stopAsync();
965 LOG.info("Stopping " + transport.getRemoteAddress() + " because " + reason);
966 } catch (InterruptedException e) {
967 }
968 }
969 });
970 } catch (Throwable t) {
971 LOG.warn("Cannot create stopAsync. This exception will be ignored.", t);
972 }
973 }
974 }
975
976 public void stopAsync() {
977 // If we're in the middle of starting then go no further... for now.
978 synchronized (this) {
979 pendingStop = true;
980 if (starting) {
981 LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
982 return;
983 }
984 }
985 if (stopping.compareAndSet(false, true)) {
986 // Let all the connection contexts know we are shutting down
987 // so that in progress operations can notice and unblock.
988 List<TransportConnectionState> connectionStates = listConnectionStates();
989 for (TransportConnectionState cs : connectionStates) {
990 ConnectionContext connectionContext = cs.getContext();
991 if (connectionContext != null) {
992 connectionContext.getStopping().set(true);
993 }
994 }
995 try {
996 stopTaskRunnerFactory.execute(new Runnable() {
997 public void run() {
998 serviceLock.writeLock().lock();
999 try {
1000 doStop();
1001 } catch (Throwable e) {
1002 LOG.debug("Error occurred while shutting down a connection " + this, e);
1003 } finally {
1004 stopped.countDown();
1005 serviceLock.writeLock().unlock();
1006 }
1007 }
1008 });
1009 } catch (Throwable t) {
1010 LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t);
1011 stopped.countDown();
1012 }
1013 }
1014 }
1015
1016 @Override
1017 public String toString() {
1018 return "Transport Connection to: " + transport.getRemoteAddress();
1019 }
1020
1021 protected void doStop() throws Exception {
1022 LOG.debug("Stopping connection: {}", transport.getRemoteAddress());
1023 connector.onStopped(this);
1024 try {
1025 synchronized (this) {
1026 if (masterBroker != null) {
1027 masterBroker.stop();
1028 }
1029 if (duplexBridge != null) {
1030 duplexBridge.stop();
1031 }
1032 }
1033 } catch (Exception ignore) {
1034 LOG.trace("Exception caught stopping. This exception is ignored.", ignore);
1035 }
1036 try {
1037 transport.stop();
1038 LOG.debug("Stopped transport: " + transport.getRemoteAddress());
1039 } catch (Exception e) {
1040 LOG.debug("Could not stop transport to " + transport.getRemoteAddress() + ". This exception is ignored.", e);
1041 }
1042 if (taskRunner != null) {
1043 taskRunner.shutdown(1);
1044 taskRunner = null;
1045 }
1046 active = false;
1047 // Run the MessageDispatch callbacks so that message references get
1048 // cleaned up.
1049 synchronized (dispatchQueue) {
1050 for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) {
1051 Command command = iter.next();
1052 if (command.isMessageDispatch()) {
1053 MessageDispatch md = (MessageDispatch) command;
1054 Runnable sub = md.getTransmitCallback();
1055 broker.postProcessDispatch(md);
1056 if (sub != null) {
1057 sub.run();
1058 }
1059 }
1060 }
1061 dispatchQueue.clear();
1062 }
1063 //
1064 // Remove all logical connection associated with this connection
1065 // from the broker.
1066 if (!broker.isStopped()) {
1067 List<TransportConnectionState> connectionStates = listConnectionStates();
1068 connectionStates = listConnectionStates();
1069 for (TransportConnectionState cs : connectionStates) {
1070 cs.getContext().getStopping().set(true);
1071 try {
1072 LOG.debug("Cleaning up connection resources: {}", getRemoteAddress());
1073 processRemoveConnection(cs.getInfo().getConnectionId(), 0l);
1074 } catch (Throwable ignore) {
1075 ignore.printStackTrace();
1076 }
1077 }
1078 }
1079 LOG.debug("Connection Stopped: {}", getRemoteAddress());
1080 }
1081
1082 /**
1083 * @return Returns the blockedCandidate.
1084 */
1085 public boolean isBlockedCandidate() {
1086 return blockedCandidate;
1087 }
1088
1089 /**
1090 * @param blockedCandidate The blockedCandidate to set.
1091 */
1092 public void setBlockedCandidate(boolean blockedCandidate) {
1093 this.blockedCandidate = blockedCandidate;
1094 }
1095
1096 /**
1097 * @return Returns the markedCandidate.
1098 */
1099 public boolean isMarkedCandidate() {
1100 return markedCandidate;
1101 }
1102
1103 /**
1104 * @param markedCandidate The markedCandidate to set.
1105 */
1106 public void setMarkedCandidate(boolean markedCandidate) {
1107 this.markedCandidate = markedCandidate;
1108 if (!markedCandidate) {
1109 timeStamp = 0;
1110 blockedCandidate = false;
1111 }
1112 }
1113
1114 /**
1115 * @param slow The slow to set.
1116 */
1117 public void setSlow(boolean slow) {
1118 this.slow = slow;
1119 }
1120
1121 /**
1122 * @return true if the Connection is slow
1123 */
1124 public boolean isSlow() {
1125 return slow;
1126 }
1127
1128 /**
1129 * @return true if the Connection is potentially blocked
1130 */
1131 public boolean isMarkedBlockedCandidate() {
1132 return markedCandidate;
1133 }
1134
1135 /**
1136 * Mark the Connection, so we can deem if it's collectable on the next sweep
1137 */
1138 public void doMark() {
1139 if (timeStamp == 0) {
1140 timeStamp = System.currentTimeMillis();
1141 }
1142 }
1143
1144 /**
1145 * @return if after being marked, the Connection is still writing
1146 */
1147 public boolean isBlocked() {
1148 return blocked;
1149 }
1150
1151 /**
1152 * @return true if the Connection is connected
1153 */
1154 public boolean isConnected() {
1155 return connected;
1156 }
1157
1158 /**
1159 * @param blocked The blocked to set.
1160 */
1161 public void setBlocked(boolean blocked) {
1162 this.blocked = blocked;
1163 }
1164
1165 /**
1166 * @param connected The connected to set.
1167 */
1168 public void setConnected(boolean connected) {
1169 this.connected = connected;
1170 }
1171
1172 /**
1173 * @return true if the Connection is active
1174 */
1175 public boolean isActive() {
1176 return active;
1177 }
1178
1179 /**
1180 * @param active The active to set.
1181 */
1182 public void setActive(boolean active) {
1183 this.active = active;
1184 }
1185
1186 /**
1187 * @return true if the Connection is starting
1188 */
1189 public synchronized boolean isStarting() {
1190 return starting;
1191 }
1192
1193 public synchronized boolean isNetworkConnection() {
1194 return networkConnection;
1195 }
1196
1197 public boolean isFaultTolerantConnection() {
1198 return this.faultTolerantConnection;
1199 }
1200
1201 protected synchronized void setStarting(boolean starting) {
1202 this.starting = starting;
1203 }
1204
1205 /**
1206 * @return true if the Connection needs to stop
1207 */
1208 public synchronized boolean isPendingStop() {
1209 return pendingStop;
1210 }
1211
1212 protected synchronized void setPendingStop(boolean pendingStop) {
1213 this.pendingStop = pendingStop;
1214 }
1215
1216 public Response processBrokerInfo(BrokerInfo info) {
1217 if (info.isSlaveBroker()) {
1218 BrokerService bService = connector.getBrokerService();
1219 // Do we only support passive slaves - or does the slave want to be
1220 // passive ?
1221 boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
1222 if (passive == false) {
1223
1224 // stream messages from this broker (the master) to
1225 // the slave
1226 MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
1227 masterBroker = new MasterBroker(parent, transport);
1228 masterBroker.startProcessing();
1229 }
1230 LOG.info((passive ? "Passive" : "Active") + " Slave Broker " + info.getBrokerName() + " is attached");
1231 bService.slaveConnectionEstablished();
1232 } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
1233 // so this TransportConnection is the rear end of a network bridge
1234 // We have been requested to create a two way pipe ...
1235 try {
1236 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
1237 Map<String, String> props = createMap(properties);
1238 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
1239 IntrospectionSupport.setProperties(config, props, "");
1240 config.setBrokerName(broker.getBrokerName());
1241
1242 // check for existing duplex connection hanging about
1243
1244 // We first look if existing network connection already exists for the same broker Id and network connector name
1245 // It's possible in case of brief network fault to have this transport connector side of the connection always active
1246 // and the duplex network connector side wanting to open a new one
1247 // In this case, the old connection must be broken
1248 String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
1249 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
1250 synchronized (connections) {
1251 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) {
1252 TransportConnection c = iter.next();
1253 if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
1254 LOG.warn("Stopping an existing active duplex connection [" + c + "] for network connector (" + duplexNetworkConnectorId + ").");
1255 c.stopAsync();
1256 // better to wait for a bit rather than get connection id already in use and failure to start new bridge
1257 c.getStopped().await(1, TimeUnit.SECONDS);
1258 }
1259 }
1260 setDuplexNetworkConnectorId(duplexNetworkConnectorId);
1261 }
1262 URI uri = broker.getVmConnectorURI();
1263 HashMap<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
1264 map.put("network", "true");
1265 map.put("async", "false");
1266 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
1267 Transport localTransport = TransportFactory.connect(uri);
1268 Transport remoteBridgeTransport = new ResponseCorrelator(transport);
1269 String duplexName = localTransport.toString();
1270 if (duplexName.contains("#")) {
1271 duplexName = duplexName.substring(duplexName.lastIndexOf("#"));
1272 }
1273 MBeanNetworkListener listener = new MBeanNetworkListener(broker.getBrokerService(), broker.getBrokerService().createDuplexNetworkConnectorObjectName(duplexName));
1274 listener.setCreatedByDuplex(true);
1275 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
1276 duplexBridge.setBrokerService(broker.getBrokerService());
1277 // now turn duplex off this side
1278 info.setDuplexConnection(false);
1279 duplexBridge.setCreatedByDuplex(true);
1280 duplexBridge.duplexStart(this, brokerInfo, info);
1281 LOG.info("Started responder end of duplex bridge " + duplexNetworkConnectorId);
1282 return null;
1283 } catch (TransportDisposedIOException e) {
1284 LOG.warn("Duplex bridge " + duplexNetworkConnectorId + " was stopped before it was correctly started.");
1285 return null;
1286 } catch (Exception e) {
1287 LOG.error("Failed to create responder end of duplex network bridge " + duplexNetworkConnectorId, e);
1288 return null;
1289 }
1290 }
1291 // We only expect to get one broker info command per connection
1292 if (this.brokerInfo != null) {
1293 LOG.warn("Unexpected extra broker info command received: " + info);
1294 }
1295 this.brokerInfo = info;
1296 networkConnection = true;
1297 List<TransportConnectionState> connectionStates = listConnectionStates();
1298 for (TransportConnectionState cs : connectionStates) {
1299 cs.getContext().setNetworkConnection(true);
1300 }
1301 return null;
1302 }
1303
1304 @SuppressWarnings({"unchecked", "rawtypes"})
1305 private HashMap<String, String> createMap(Properties properties) {
1306 return new HashMap(properties);
1307 }
1308
1309 protected void dispatch(Command command) throws IOException {
1310 try {
1311 setMarkedCandidate(true);
1312 transport.oneway(command);
1313 } finally {
1314 setMarkedCandidate(false);
1315 }
1316 }
1317
1318 public String getRemoteAddress() {
1319 return transport.getRemoteAddress();
1320 }
1321
1322 public String getConnectionId() {
1323 List<TransportConnectionState> connectionStates = listConnectionStates();
1324 for (TransportConnectionState cs : connectionStates) {
1325 if (cs.getInfo().getClientId() != null) {
1326 return cs.getInfo().getClientId();
1327 }
1328 return cs.getInfo().getConnectionId().toString();
1329 }
1330 return null;
1331 }
1332
1333 public void updateClient(ConnectionControl control) {
1334 if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
1335 && this.wireFormatInfo.getVersion() >= 6) {
1336 dispatchAsync(control);
1337 }
1338 }
1339
1340 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
1341 ProducerBrokerExchange result = producerExchanges.get(id);
1342 if (result == null) {
1343 synchronized (producerExchanges) {
1344 result = new ProducerBrokerExchange();
1345 TransportConnectionState state = lookupConnectionState(id);
1346 context = state.getContext();
1347 result.setConnectionContext(context);
1348 if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) {
1349 result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
1350 }
1351 SessionState ss = state.getSessionState(id.getParentId());
1352 if (ss != null) {
1353 result.setProducerState(ss.getProducerState(id));
1354 ProducerState producerState = ss.getProducerState(id);
1355 if (producerState != null && producerState.getInfo() != null) {
1356 ProducerInfo info = producerState.getInfo();
1357 result.setMutable(info.getDestination() == null || info.getDestination().isComposite());
1358 }
1359 }
1360 producerExchanges.put(id, result);
1361 }
1362 } else {
1363 context = result.getConnectionContext();
1364 }
1365 return result;
1366 }
1367
1368 private void removeProducerBrokerExchange(ProducerId id) {
1369 synchronized (producerExchanges) {
1370 producerExchanges.remove(id);
1371 }
1372 }
1373
1374 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
1375 ConsumerBrokerExchange result = consumerExchanges.get(id);
1376 return result;
1377 }
1378
1379 private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) {
1380 ConsumerBrokerExchange result = consumerExchanges.get(id);
1381 if (result == null) {
1382 synchronized (consumerExchanges) {
1383 result = new ConsumerBrokerExchange();
1384 TransportConnectionState state = lookupConnectionState(id);
1385 context = state.getContext();
1386 result.setConnectionContext(context);
1387 SessionState ss = state.getSessionState(id.getParentId());
1388 if (ss != null) {
1389 ConsumerState cs = ss.getConsumerState(id);
1390 if (cs != null) {
1391 ConsumerInfo info = cs.getInfo();
1392 if (info != null) {
1393 if (info.getDestination() != null && info.getDestination().isPattern()) {
1394 result.setWildcard(true);
1395 }
1396 }
1397 }
1398 }
1399 consumerExchanges.put(id, result);
1400 }
1401 }
1402 return result;
1403 }
1404
1405 private void removeConsumerBrokerExchange(ConsumerId id) {
1406 synchronized (consumerExchanges) {
1407 consumerExchanges.remove(id);
1408 }
1409 }
1410
1411 public int getProtocolVersion() {
1412 return protocolVersion.get();
1413 }
1414
1415 public Response processControlCommand(ControlCommand command) throws Exception {
1416 String control = command.getCommand();
1417 if (control != null && control.equals("shutdown")) {
1418 System.exit(0);
1419 }
1420 return null;
1421 }
1422
1423 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception {
1424 return null;
1425 }
1426
1427 public Response processConnectionControl(ConnectionControl control) throws Exception {
1428 if (control != null) {
1429 faultTolerantConnection = control.isFaultTolerant();
1430 }
1431 return null;
1432 }
1433
1434 public Response processConnectionError(ConnectionError error) throws Exception {
1435 return null;
1436 }
1437
1438 public Response processConsumerControl(ConsumerControl control) throws Exception {
1439 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId());
1440 broker.processConsumerControl(consumerExchange, control);
1441 return null;
1442 }
1443
1444 protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId,
1445 TransportConnectionState state) {
1446 TransportConnectionState cs = null;
1447 if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) {
1448 // swap implementations
1449 TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister();
1450 newRegister.intialize(connectionStateRegister);
1451 connectionStateRegister = newRegister;
1452 }
1453 cs = connectionStateRegister.registerConnectionState(connectionId, state);
1454 return cs;
1455 }
1456
1457 protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) {
1458 return connectionStateRegister.unregisterConnectionState(connectionId);
1459 }
1460
1461 protected synchronized List<TransportConnectionState> listConnectionStates() {
1462 return connectionStateRegister.listConnectionStates();
1463 }
1464
1465 protected synchronized TransportConnectionState lookupConnectionState(String connectionId) {
1466 return connectionStateRegister.lookupConnectionState(connectionId);
1467 }
1468
1469 protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) {
1470 return connectionStateRegister.lookupConnectionState(id);
1471 }
1472
1473 protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) {
1474 return connectionStateRegister.lookupConnectionState(id);
1475 }
1476
1477 protected synchronized TransportConnectionState lookupConnectionState(SessionId id) {
1478 return connectionStateRegister.lookupConnectionState(id);
1479 }
1480
1481 protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
1482 return connectionStateRegister.lookupConnectionState(connectionId);
1483 }
1484
1485 protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) {
1486 this.duplexNetworkConnectorId = duplexNetworkConnectorId;
1487 }
1488
1489 protected synchronized String getDuplexNetworkConnectorId() {
1490 return this.duplexNetworkConnectorId;
1491 }
1492
1493 public boolean isStopping() {
1494 return stopping.get();
1495 }
1496
1497 protected CountDownLatch getStopped() {
1498 return stopped;
1499 }
1500
1501 private int getProducerCount(ConnectionId connectionId) {
1502 int result = 0;
1503 TransportConnectionState cs = lookupConnectionState(connectionId);
1504 if (cs != null) {
1505 for (SessionId sessionId : cs.getSessionIds()) {
1506 SessionState sessionState = cs.getSessionState(sessionId);
1507 if (sessionState != null) {
1508 result += sessionState.getProducerIds().size();
1509 }
1510 }
1511 }
1512 return result;
1513 }
1514
1515 private int getConsumerCount(ConnectionId connectionId) {
1516 int result = 0;
1517 TransportConnectionState cs = lookupConnectionState(connectionId);
1518 if (cs != null) {
1519 for (SessionId sessionId : cs.getSessionIds()) {
1520 SessionState sessionState = cs.getSessionState(sessionId);
1521 if (sessionState != null) {
1522 result += sessionState.getConsumerIds().size();
1523 }
1524 }
1525 }
1526 return result;
1527 }
1528 }