001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.io.IOException;
020 import java.security.GeneralSecurityException;
021 import java.security.cert.X509Certificate;
022 import java.util.Arrays;
023 import java.util.Collection;
024 import java.util.List;
025 import java.util.Properties;
026 import java.util.concurrent.ConcurrentHashMap;
027 import java.util.concurrent.CountDownLatch;
028 import java.util.concurrent.TimeUnit;
029 import java.util.concurrent.atomic.AtomicBoolean;
030 import java.util.concurrent.atomic.AtomicLong;
031
032 import javax.management.ObjectName;
033
034 import org.apache.activemq.Service;
035 import org.apache.activemq.advisory.AdvisorySupport;
036 import org.apache.activemq.broker.BrokerService;
037 import org.apache.activemq.broker.BrokerServiceAware;
038 import org.apache.activemq.broker.TransportConnection;
039 import org.apache.activemq.broker.region.AbstractRegion;
040 import org.apache.activemq.broker.region.DurableTopicSubscription;
041 import org.apache.activemq.broker.region.Region;
042 import org.apache.activemq.broker.region.RegionBroker;
043 import org.apache.activemq.broker.region.Subscription;
044 import org.apache.activemq.broker.region.policy.PolicyEntry;
045 import org.apache.activemq.command.ActiveMQDestination;
046 import org.apache.activemq.command.ActiveMQMessage;
047 import org.apache.activemq.command.ActiveMQTempDestination;
048 import org.apache.activemq.command.ActiveMQTopic;
049 import org.apache.activemq.command.BrokerId;
050 import org.apache.activemq.command.BrokerInfo;
051 import org.apache.activemq.command.Command;
052 import org.apache.activemq.command.ConnectionError;
053 import org.apache.activemq.command.ConnectionId;
054 import org.apache.activemq.command.ConnectionInfo;
055 import org.apache.activemq.command.ConsumerId;
056 import org.apache.activemq.command.ConsumerInfo;
057 import org.apache.activemq.command.DataStructure;
058 import org.apache.activemq.command.DestinationInfo;
059 import org.apache.activemq.command.ExceptionResponse;
060 import org.apache.activemq.command.KeepAliveInfo;
061 import org.apache.activemq.command.Message;
062 import org.apache.activemq.command.MessageAck;
063 import org.apache.activemq.command.MessageDispatch;
064 import org.apache.activemq.command.NetworkBridgeFilter;
065 import org.apache.activemq.command.ProducerInfo;
066 import org.apache.activemq.command.RemoveInfo;
067 import org.apache.activemq.command.Response;
068 import org.apache.activemq.command.SessionInfo;
069 import org.apache.activemq.command.ShutdownInfo;
070 import org.apache.activemq.command.WireFormatInfo;
071 import org.apache.activemq.filter.DestinationFilter;
072 import org.apache.activemq.filter.MessageEvaluationContext;
073 import org.apache.activemq.transport.DefaultTransportListener;
074 import org.apache.activemq.transport.FutureResponse;
075 import org.apache.activemq.transport.ResponseCallback;
076 import org.apache.activemq.transport.Transport;
077 import org.apache.activemq.transport.TransportDisposedIOException;
078 import org.apache.activemq.transport.TransportFilter;
079 import org.apache.activemq.transport.tcp.SslTransport;
080 import org.apache.activemq.util.IdGenerator;
081 import org.apache.activemq.util.IntrospectionSupport;
082 import org.apache.activemq.util.LongSequenceGenerator;
083 import org.apache.activemq.util.MarshallingSupport;
084 import org.apache.activemq.util.ServiceStopper;
085 import org.apache.activemq.util.ServiceSupport;
086 import org.slf4j.Logger;
087 import org.slf4j.LoggerFactory;
088
089 /**
090 * A useful base class for implementing demand forwarding bridges.
091 */
092 public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
093 private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
094 protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
095 protected final Transport localBroker;
096 protected final Transport remoteBroker;
097 protected final IdGenerator idGenerator = new IdGenerator();
098 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
099 protected ConnectionInfo localConnectionInfo;
100 protected ConnectionInfo remoteConnectionInfo;
101 protected SessionInfo localSessionInfo;
102 protected ProducerInfo producerInfo;
103 protected String remoteBrokerName = "Unknown";
104 protected String localClientId;
105 protected ConsumerInfo demandConsumerInfo;
106 protected int demandConsumerDispatched;
107 protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
108 protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
109 protected AtomicBoolean disposed = new AtomicBoolean();
110 protected BrokerId localBrokerId;
111 protected ActiveMQDestination[] excludedDestinations;
112 protected ActiveMQDestination[] dynamicallyIncludedDestinations;
113 protected ActiveMQDestination[] staticallyIncludedDestinations;
114 protected ActiveMQDestination[] durableDestinations;
115 protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
116 protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
117 protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
118 protected final CountDownLatch startedLatch = new CountDownLatch(2);
119 protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
120 protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
121 protected NetworkBridgeConfiguration configuration;
122 protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
123
124 protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
125 protected Object brokerInfoMutex = new Object();
126 protected BrokerId remoteBrokerId;
127
128 final AtomicLong enqueueCounter = new AtomicLong();
129 final AtomicLong dequeueCounter = new AtomicLong();
130
131 private NetworkBridgeListener networkBridgeListener;
132 private boolean createdByDuplex;
133 private BrokerInfo localBrokerInfo;
134 private BrokerInfo remoteBrokerInfo;
135
136 private final AtomicBoolean started = new AtomicBoolean();
137 private TransportConnection duplexInitiatingConnection;
138 private BrokerService brokerService = null;
139 private ObjectName mbeanObjectName;
140
141 public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
142 this.configuration = configuration;
143 this.localBroker = localBroker;
144 this.remoteBroker = remoteBroker;
145 }
146
147 public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
148 this.localBrokerInfo = localBrokerInfo;
149 this.remoteBrokerInfo = remoteBrokerInfo;
150 this.duplexInitiatingConnection = connection;
151 start();
152 serviceRemoteCommand(remoteBrokerInfo);
153 }
154
155 public void start() throws Exception {
156 if (started.compareAndSet(false, true)) {
157
158 if (brokerService == null) {
159 throw new IllegalArgumentException("BrokerService is null on " + this);
160 }
161
162 localBroker.setTransportListener(new DefaultTransportListener() {
163
164 @Override
165 public void onCommand(Object o) {
166 Command command = (Command) o;
167 serviceLocalCommand(command);
168 }
169
170 @Override
171 public void onException(IOException error) {
172 serviceLocalException(error);
173 }
174 });
175 remoteBroker.setTransportListener(new DefaultTransportListener() {
176
177 public void onCommand(Object o) {
178 Command command = (Command) o;
179 serviceRemoteCommand(command);
180 }
181
182 public void onException(IOException error) {
183 serviceRemoteException(error);
184 }
185
186 });
187
188 localBroker.start();
189 remoteBroker.start();
190 if (!disposed.get()) {
191 try {
192 triggerRemoteStartBridge();
193 } catch (IOException e) {
194 LOG.warn("Caught exception from remote start", e);
195 }
196 } else {
197 LOG.warn ("Bridge was disposed before the start() method was fully executed.");
198 throw new TransportDisposedIOException();
199 }
200 }
201 }
202
203 protected void triggerLocalStartBridge() throws IOException {
204 brokerService.getTaskRunnerFactory().execute(new Runnable() {
205 public void run() {
206 final String originalName = Thread.currentThread().getName();
207 Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
208 try {
209 startLocalBridge();
210 } catch (Throwable e) {
211 serviceLocalException(e);
212 } finally {
213 Thread.currentThread().setName(originalName);
214 }
215 }
216 });
217 }
218
219 protected void triggerRemoteStartBridge() throws IOException {
220 brokerService.getTaskRunnerFactory().execute(new Runnable() {
221 public void run() {
222 final String originalName = Thread.currentThread().getName();
223 Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
224 try {
225 startRemoteBridge();
226 } catch (Exception e) {
227 serviceRemoteException(e);
228 } finally {
229 Thread.currentThread().setName(originalName);
230 }
231 }
232 });
233 }
234
235 private void startLocalBridge() throws Throwable {
236 if (localBridgeStarted.compareAndSet(false, true)) {
237 synchronized (this) {
238 if (LOG.isTraceEnabled()) {
239 LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker);
240 }
241 if (!disposed.get()) {
242 localConnectionInfo = new ConnectionInfo();
243 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
244 localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
245 localConnectionInfo.setClientId(localClientId);
246 localConnectionInfo.setUserName(configuration.getUserName());
247 localConnectionInfo.setPassword(configuration.getPassword());
248 Transport originalTransport = remoteBroker;
249 while (originalTransport instanceof TransportFilter) {
250 originalTransport = ((TransportFilter) originalTransport).getNext();
251 }
252 if (originalTransport instanceof SslTransport) {
253 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
254 localConnectionInfo.setTransportContext(peerCerts);
255 }
256 // sync requests that may fail
257 Object resp = localBroker.request(localConnectionInfo);
258 if (resp instanceof ExceptionResponse) {
259 throw ((ExceptionResponse)resp).getException();
260 }
261 localSessionInfo = new SessionInfo(localConnectionInfo, 1);
262 localBroker.oneway(localSessionInfo);
263
264 brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
265 NetworkBridgeListener l = this.networkBridgeListener;
266 if (l != null) {
267 l.onStart(this);
268 }
269 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
270
271 } else {
272 LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed.");
273 }
274 startedLatch.countDown();
275 localStartedLatch.countDown();
276 }
277
278 safeWaitUntilStarted();
279
280 if (!disposed.get()) {
281 setupStaticDestinations();
282 } else {
283 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment.");
284 }
285 }
286 }
287
288 protected void startRemoteBridge() throws Exception {
289 if (remoteBridgeStarted.compareAndSet(false, true)) {
290 if (LOG.isTraceEnabled()) {
291 LOG.trace(configuration.getBrokerName() + " starting remote Bridge, remoteBroker=" + remoteBroker);
292 }
293 synchronized (this) {
294 if (!isCreatedByDuplex()) {
295 BrokerInfo brokerInfo = new BrokerInfo();
296 brokerInfo.setBrokerName(configuration.getBrokerName());
297 brokerInfo.setBrokerURL(configuration.getBrokerURL());
298 brokerInfo.setNetworkConnection(true);
299 brokerInfo.setDuplexConnection(configuration.isDuplex());
300 // set our properties
301 Properties props = new Properties();
302 IntrospectionSupport.getProperties(configuration, props, null);
303 String str = MarshallingSupport.propertiesToString(props);
304 brokerInfo.setNetworkProperties(str);
305 brokerInfo.setBrokerId(this.localBrokerId);
306 remoteBroker.oneway(brokerInfo);
307 }
308 if (remoteConnectionInfo != null) {
309 remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
310 }
311 remoteConnectionInfo = new ConnectionInfo();
312 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
313 remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
314 remoteConnectionInfo.setUserName(configuration.getUserName());
315 remoteConnectionInfo.setPassword(configuration.getPassword());
316 remoteBroker.oneway(remoteConnectionInfo);
317
318 SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
319 remoteBroker.oneway(remoteSessionInfo);
320 producerInfo = new ProducerInfo(remoteSessionInfo, 1);
321 producerInfo.setResponseRequired(false);
322 remoteBroker.oneway(producerInfo);
323 // Listen to consumer advisory messages on the remote broker to
324 // determine demand.
325 if (!configuration.isStaticBridge()) {
326 demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
327 // always dispatch advisory message asynchronously so that we never block the producer
328 // broker if we are slow
329 demandConsumerInfo.setDispatchAsync(true);
330 String advisoryTopic = configuration.getDestinationFilter();
331 if (configuration.isBridgeTempDestinations()) {
332 advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
333 }
334 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
335 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
336 remoteBroker.oneway(demandConsumerInfo);
337 }
338 startedLatch.countDown();
339 }
340 }
341 }
342
343 public void stop() throws Exception {
344 if (started.compareAndSet(true, false)) {
345 if (disposed.compareAndSet(false, true)) {
346 LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
347 NetworkBridgeListener l = this.networkBridgeListener;
348 if (l != null) {
349 l.onStop(this);
350 }
351 try {
352 remoteBridgeStarted.set(false);
353 final CountDownLatch sendShutdown = new CountDownLatch(1);
354
355 brokerService.getTaskRunnerFactory().execute(new Runnable() {
356 public void run() {
357 try {
358 localBroker.oneway(new ShutdownInfo());
359 sendShutdown.countDown();
360 remoteBroker.oneway(new ShutdownInfo());
361 } catch (Throwable e) {
362 LOG.debug("Caught exception sending shutdown", e);
363 } finally {
364 sendShutdown.countDown();
365 }
366
367 }
368 }, "ActiveMQ ForwardingBridge StopTask");
369
370 if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
371 LOG.info("Network Could not shutdown in a timely manner");
372 }
373 } finally {
374 ServiceStopper ss = new ServiceStopper();
375 ss.stop(remoteBroker);
376 ss.stop(localBroker);
377 // Release the started Latch since another thread could be
378 // stuck waiting for it to start up.
379 startedLatch.countDown();
380 startedLatch.countDown();
381 localStartedLatch.countDown();
382
383 ss.throwFirstException();
384 }
385 }
386
387 if (remoteBrokerInfo != null) {
388 brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
389 brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
390 LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
391 }
392 }
393 }
394
395 public void serviceRemoteException(Throwable error) {
396 if (!disposed.get()) {
397 if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
398 LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
399 } else {
400 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
401 }
402 LOG.debug("The remote Exception was: " + error, error);
403 brokerService.getTaskRunnerFactory().execute(new Runnable() {
404 public void run() {
405 ServiceSupport.dispose(getControllingService());
406 }
407 });
408 fireBridgeFailed();
409 }
410 }
411
412 protected void serviceRemoteCommand(Command command) {
413 if (!disposed.get()) {
414 try {
415 if (command.isMessageDispatch()) {
416 safeWaitUntilStarted();
417 MessageDispatch md = (MessageDispatch) command;
418 serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
419 ackAdvisory(md.getMessage());
420 } else if (command.isBrokerInfo()) {
421 lastConnectSucceeded.set(true);
422 remoteBrokerInfo = (BrokerInfo) command;
423 Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
424 try {
425 IntrospectionSupport.getProperties(configuration, props, null);
426 if (configuration.getExcludedDestinations() != null) {
427 excludedDestinations = configuration.getExcludedDestinations().toArray(
428 new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
429 }
430 if (configuration.getStaticallyIncludedDestinations() != null) {
431 staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
432 new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
433 }
434 if (configuration.getDynamicallyIncludedDestinations() != null) {
435 dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations()
436 .toArray(
437 new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations()
438 .size()]);
439 }
440 } catch (Throwable t) {
441 LOG.error("Error mapping remote destinations", t);
442 }
443 serviceRemoteBrokerInfo(command);
444 // Let the local broker know the remote broker's ID.
445 localBroker.oneway(command);
446 // new peer broker (a consumer can work with remote broker also)
447 brokerService.getBroker().addBroker(null, remoteBrokerInfo);
448 } else if (command.getClass() == ConnectionError.class) {
449 ConnectionError ce = (ConnectionError) command;
450 serviceRemoteException(ce.getException());
451 } else {
452 if (isDuplex()) {
453 if (command.isMessage()) {
454 ActiveMQMessage message = (ActiveMQMessage) command;
455 if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
456 || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
457 serviceRemoteConsumerAdvisory(message.getDataStructure());
458 ackAdvisory(message);
459 } else {
460 if (!isPermissableDestination(message.getDestination(), true)) {
461 return;
462 }
463 if (message.isResponseRequired()) {
464 Response reply = new Response();
465 reply.setCorrelationId(message.getCommandId());
466 localBroker.oneway(message);
467 remoteBroker.oneway(reply);
468 } else {
469 localBroker.oneway(message);
470 }
471 }
472 } else {
473 switch (command.getDataStructureType()) {
474 case ConnectionInfo.DATA_STRUCTURE_TYPE:
475 case SessionInfo.DATA_STRUCTURE_TYPE:
476 case ProducerInfo.DATA_STRUCTURE_TYPE:
477 localBroker.oneway(command);
478 break;
479 case MessageAck.DATA_STRUCTURE_TYPE:
480 MessageAck ack = (MessageAck) command;
481 DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
482 if (localSub != null) {
483 ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
484 localBroker.oneway(ack);
485 } else {
486 LOG.warn("Matching local subscription not found for ack: " + ack);
487 }
488 break;
489 case ConsumerInfo.DATA_STRUCTURE_TYPE:
490 localStartedLatch.await();
491 if (started.get()) {
492 if (!addConsumerInfo((ConsumerInfo) command)) {
493 if (LOG.isDebugEnabled()) {
494 LOG.debug("Ignoring ConsumerInfo: " + command);
495 }
496 } else {
497 if (LOG.isTraceEnabled()) {
498 LOG.trace("Adding ConsumerInfo: " + command);
499 }
500 }
501 } else {
502 // received a subscription whilst stopping
503 LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
504 }
505 break;
506 case ShutdownInfo.DATA_STRUCTURE_TYPE:
507 // initiator is shutting down, controlled case
508 // abortive close dealt with by inactivity monitor
509 LOG.info("Stopping network bridge on shutdown of remote broker");
510 serviceRemoteException(new IOException(command.toString()));
511 break;
512 default:
513 if (LOG.isDebugEnabled()) {
514 LOG.debug("Ignoring remote command: " + command);
515 }
516 }
517 }
518 } else {
519 switch (command.getDataStructureType()) {
520 case KeepAliveInfo.DATA_STRUCTURE_TYPE:
521 case WireFormatInfo.DATA_STRUCTURE_TYPE:
522 case ShutdownInfo.DATA_STRUCTURE_TYPE:
523 break;
524 default:
525 LOG.warn("Unexpected remote command: " + command);
526 }
527 }
528 }
529 } catch (Throwable e) {
530 if (LOG.isDebugEnabled()) {
531 LOG.debug("Exception processing remote command: " + command, e);
532 }
533 serviceRemoteException(e);
534 }
535 }
536 }
537
538 private void ackAdvisory(Message message) throws IOException {
539 demandConsumerDispatched++;
540 if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
541 MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
542 ack.setConsumerId(demandConsumerInfo.getConsumerId());
543 remoteBroker.oneway(ack);
544 demandConsumerDispatched = 0;
545 }
546 }
547
548 private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
549 final int networkTTL = configuration.getNetworkTTL();
550 if (data.getClass() == ConsumerInfo.class) {
551 // Create a new local subscription
552 ConsumerInfo info = (ConsumerInfo) data;
553 BrokerId[] path = info.getBrokerPath();
554
555 if (info.isBrowser()) {
556 if (LOG.isDebugEnabled()) {
557 LOG.info(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", browsers explicitly suppressed");
558 }
559 return;
560 }
561
562 if (path != null && path.length >= networkTTL) {
563 if (LOG.isDebugEnabled()) {
564 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info);
565 }
566 return;
567 }
568 if (contains(path, localBrokerPath[0])) {
569 // Ignore this consumer as it's a consumer we locally sent to the broker.
570 if (LOG.isDebugEnabled()) {
571 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info);
572 }
573 return;
574 }
575 if (!isPermissableDestination(info.getDestination())) {
576 // ignore if not in the permitted or in the excluded list
577 if (LOG.isDebugEnabled()) {
578 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info);
579 }
580 return;
581 }
582
583 // in a cyclic network there can be multiple bridges per broker that can propagate
584 // a network subscription so there is a need to synchronize on a shared entity
585 synchronized (brokerService.getVmConnectorURI()) {
586 if (addConsumerInfo(info)) {
587 if (LOG.isDebugEnabled()) {
588 LOG.debug(configuration.getBrokerName() + " bridged sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
589 }
590 } else {
591 if (LOG.isDebugEnabled()) {
592 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info);
593 }
594 }
595 }
596 } else if (data.getClass() == DestinationInfo.class) {
597 // It's a destination info - we want to pass up information about temporary destinations
598 DestinationInfo destInfo = (DestinationInfo) data;
599 BrokerId[] path = destInfo.getBrokerPath();
600 if (path != null && path.length >= networkTTL) {
601 if (LOG.isDebugEnabled()) {
602 LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only");
603 }
604 return;
605 }
606 if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
607 // Ignore this consumer as it's a consumer we locally sent to the broker.
608 if (LOG.isDebugEnabled()) {
609 LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
610 }
611 return;
612 }
613 destInfo.setConnectionId(localConnectionInfo.getConnectionId());
614 if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
615 // re-set connection id so comes from here
616 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
617 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
618 }
619 destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
620 if (LOG.isTraceEnabled()) {
621 LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo);
622 }
623 localBroker.oneway(destInfo);
624 } else if (data.getClass() == RemoveInfo.class) {
625 ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
626 removeDemandSubscription(id);
627 }
628 }
629
630 public void serviceLocalException(Throwable error) {
631 if (!disposed.get()) {
632 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
633 LOG.debug("The local Exception was:" + error, error);
634 brokerService.getTaskRunnerFactory().execute(new Runnable() {
635 public void run() {
636 ServiceSupport.dispose(getControllingService());
637 }
638 });
639 fireBridgeFailed();
640 }
641 }
642
643 protected Service getControllingService() {
644 return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
645 }
646
647 protected void addSubscription(DemandSubscription sub) throws IOException {
648 if (sub != null) {
649 localBroker.oneway(sub.getLocalInfo());
650 }
651 }
652
653 protected void removeSubscription(final DemandSubscription sub) throws IOException {
654 if (sub != null) {
655 if (LOG.isDebugEnabled()) {
656 LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
657 }
658 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
659 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
660
661 // continue removal in separate thread to free up this thread for outstanding responses
662 brokerService.getTaskRunnerFactory().execute(new Runnable() {
663 public void run() {
664 sub.waitForCompletion();
665 try {
666 localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
667 } catch (IOException e) {
668 LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e);
669 }
670 }
671 });
672 }
673 }
674
675 protected Message configureMessage(MessageDispatch md) throws IOException {
676 Message message = md.getMessage().copy();
677 // Update the packet to show where it came from.
678 message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
679 message.setProducerId(producerInfo.getProducerId());
680 message.setDestination(md.getDestination());
681 if (message.getOriginalTransactionId() == null) {
682 message.setOriginalTransactionId(message.getTransactionId());
683 }
684 message.setTransactionId(null);
685 if (configuration.isUseCompression()) {
686 message.compress();
687 }
688 return message;
689 }
690
691 protected void serviceLocalCommand(Command command) {
692 if (!disposed.get()) {
693 try {
694 if (command.isMessageDispatch()) {
695 safeWaitUntilStarted();
696 enqueueCounter.incrementAndGet();
697 final MessageDispatch md = (MessageDispatch) command;
698 final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
699 if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
700
701 if (suppressMessageDispatch(md, sub)) {
702 if (LOG.isDebugEnabled()) {
703 LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage());
704 }
705 // still ack as it may be durable
706 try {
707 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
708 } finally {
709 sub.decrementOutstandingResponses();
710 }
711 return;
712 }
713
714 Message message = configureMessage(md);
715 if (LOG.isDebugEnabled()) {
716 LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + (LOG.isTraceEnabled() ? message : message.getMessageId()) + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
717 }
718
719 if (!configuration.isAlwaysSyncSend() && !message.isPersistent()) {
720
721 // If the message was originally sent using async
722 // send, we will preserve that QOS
723 // by bridging it using an async send (small chance
724 // of message loss).
725 try {
726 remoteBroker.oneway(message);
727 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
728 dequeueCounter.incrementAndGet();
729 } finally {
730 sub.decrementOutstandingResponses();
731 }
732
733 } else {
734
735 // The message was not sent using async send, so we
736 // should only ack the local
737 // broker when we get confirmation that the remote
738 // broker has received the message.
739 ResponseCallback callback = new ResponseCallback() {
740 public void onCompletion(FutureResponse future) {
741 try {
742 Response response = future.getResult();
743 if (response.isException()) {
744 ExceptionResponse er = (ExceptionResponse) response;
745 serviceLocalException(er.getException());
746 } else {
747 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
748 dequeueCounter.incrementAndGet();
749 }
750 } catch (IOException e) {
751 serviceLocalException(e);
752 } finally {
753 sub.decrementOutstandingResponses();
754 }
755 }
756 };
757
758 remoteBroker.asyncRequest(message, callback);
759
760 }
761 } else {
762 if (LOG.isDebugEnabled()) {
763 LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
764 }
765 }
766 } else if (command.isBrokerInfo()) {
767 localBrokerInfo = (BrokerInfo) command;
768 serviceLocalBrokerInfo(command);
769 } else if (command.isShutdownInfo()) {
770 LOG.info(configuration.getBrokerName() + " Shutting down");
771 stop();
772 } else if (command.getClass() == ConnectionError.class) {
773 ConnectionError ce = (ConnectionError) command;
774 serviceLocalException(ce.getException());
775 } else {
776 switch (command.getDataStructureType()) {
777 case WireFormatInfo.DATA_STRUCTURE_TYPE:
778 break;
779 default:
780 LOG.warn("Unexpected local command: " + command);
781 }
782 }
783 } catch (Throwable e) {
784 LOG.warn("Caught an exception processing local command", e);
785 serviceLocalException(e);
786 }
787 }
788 }
789
790 private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
791 boolean suppress = false;
792 // for durable subs, suppression via filter leaves dangling acks so we need to
793 // check here and allow the ack irrespective
794 if (sub.getLocalInfo().isDurable()) {
795 MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
796 messageEvalContext.setMessageReference(md.getMessage());
797 messageEvalContext.setDestination(md.getDestination());
798 suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
799 }
800 return suppress;
801 }
802
803 /**
804 * @return Returns the dynamicallyIncludedDestinations.
805 */
806 public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
807 return dynamicallyIncludedDestinations;
808 }
809
810 /**
811 * @param dynamicallyIncludedDestinations The
812 * dynamicallyIncludedDestinations to set.
813 */
814 public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
815 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
816 }
817
818 /**
819 * @return Returns the excludedDestinations.
820 */
821 public ActiveMQDestination[] getExcludedDestinations() {
822 return excludedDestinations;
823 }
824
825 /**
826 * @param excludedDestinations The excludedDestinations to set.
827 */
828 public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
829 this.excludedDestinations = excludedDestinations;
830 }
831
832 /**
833 * @return Returns the staticallyIncludedDestinations.
834 */
835 public ActiveMQDestination[] getStaticallyIncludedDestinations() {
836 return staticallyIncludedDestinations;
837 }
838
839 /**
840 * @param staticallyIncludedDestinations The staticallyIncludedDestinations
841 * to set.
842 */
843 public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
844 this.staticallyIncludedDestinations = staticallyIncludedDestinations;
845 }
846
847 /**
848 * @return Returns the durableDestinations.
849 */
850 public ActiveMQDestination[] getDurableDestinations() {
851 return durableDestinations;
852 }
853
854 /**
855 * @param durableDestinations The durableDestinations to set.
856 */
857 public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
858 this.durableDestinations = durableDestinations;
859 }
860
861 /**
862 * @return Returns the localBroker.
863 */
864 public Transport getLocalBroker() {
865 return localBroker;
866 }
867
868 /**
869 * @return Returns the remoteBroker.
870 */
871 public Transport getRemoteBroker() {
872 return remoteBroker;
873 }
874
875 /**
876 * @return the createdByDuplex
877 */
878 public boolean isCreatedByDuplex() {
879 return this.createdByDuplex;
880 }
881
882 /**
883 * @param createdByDuplex the createdByDuplex to set
884 */
885 public void setCreatedByDuplex(boolean createdByDuplex) {
886 this.createdByDuplex = createdByDuplex;
887 }
888
889 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
890 if (brokerPath != null) {
891 for (int i = 0; i < brokerPath.length; i++) {
892 if (brokerId.equals(brokerPath[i])) {
893 return true;
894 }
895 }
896 }
897 return false;
898 }
899
900 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
901 if (brokerPath == null || brokerPath.length == 0) {
902 return pathsToAppend;
903 }
904 BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
905 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
906 System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
907 return rc;
908 }
909
910 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
911 if (brokerPath == null || brokerPath.length == 0) {
912 return new BrokerId[] { idToAppend };
913 }
914 BrokerId rc[] = new BrokerId[brokerPath.length + 1];
915 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
916 rc[brokerPath.length] = idToAppend;
917 return rc;
918 }
919
920 protected boolean isPermissableDestination(ActiveMQDestination destination) {
921 return isPermissableDestination(destination, false);
922 }
923
924 protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
925 // Are we not bridging temporary destinations?
926 if (destination.isTemporary()) {
927 if (allowTemporary) {
928 return true;
929 } else {
930 return configuration.isBridgeTempDestinations();
931 }
932 }
933
934 ActiveMQDestination[] dests = staticallyIncludedDestinations;
935 if (dests != null && dests.length > 0) {
936 for (int i = 0; i < dests.length; i++) {
937 ActiveMQDestination match = dests[i];
938 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
939 if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
940 return true;
941 }
942 }
943 }
944
945 dests = excludedDestinations;
946 if (dests != null && dests.length > 0) {
947 for (int i = 0; i < dests.length; i++) {
948 ActiveMQDestination match = dests[i];
949 DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match);
950 if (match != null && exclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
951 return false;
952 }
953 }
954 }
955
956 dests = dynamicallyIncludedDestinations;
957 if (dests != null && dests.length > 0) {
958 for (int i = 0; i < dests.length; i++) {
959 ActiveMQDestination match = dests[i];
960 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
961 if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
962 return true;
963 }
964 }
965
966 return false;
967 }
968 return true;
969 }
970
971 /**
972 * Subscriptions for these destinations are always created
973 */
974 protected void setupStaticDestinations() {
975 ActiveMQDestination[] dests = staticallyIncludedDestinations;
976 if (dests != null) {
977 for (int i = 0; i < dests.length; i++) {
978 ActiveMQDestination dest = dests[i];
979 DemandSubscription sub = createDemandSubscription(dest);
980 try {
981 addSubscription(sub);
982 } catch (IOException e) {
983 LOG.error("Failed to add static destination " + dest, e);
984 }
985 if (LOG.isTraceEnabled()) {
986 LOG.trace("bridging messages for static destination: " + dest);
987 }
988 }
989 }
990 }
991
992 protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
993 boolean consumerAdded = false;
994 ConsumerInfo info = consumerInfo.copy();
995 addRemoteBrokerToBrokerPath(info);
996 DemandSubscription sub = createDemandSubscription(info);
997 if (sub != null) {
998 if (duplicateSuppressionIsRequired(sub)) {
999 undoMapRegistration(sub);
1000 } else {
1001 addSubscription(sub);
1002 consumerAdded = true;
1003 }
1004 }
1005 return consumerAdded;
1006 }
1007
1008 private void undoMapRegistration(DemandSubscription sub) {
1009 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1010 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1011 }
1012
1013 /*
1014 * check our existing subs networkConsumerIds against the list of network ids in this subscription
1015 * A match means a duplicate which we suppress for topics and maybe for queues
1016 */
1017 private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1018 final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1019 boolean suppress = false;
1020
1021 if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() ||
1022 consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) {
1023 return suppress;
1024 }
1025
1026 List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1027 Collection<Subscription> currentSubs =
1028 getRegionSubscriptions(consumerInfo.getDestination());
1029 for (Subscription sub : currentSubs) {
1030 List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1031 if (!networkConsumers.isEmpty()) {
1032 if (matchFound(candidateConsumers, networkConsumers)) {
1033 if (isInActiveDurableSub(sub)) {
1034 suppress = false;
1035 } else {
1036 suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1037 }
1038 break;
1039 }
1040 }
1041 }
1042 return suppress;
1043 }
1044
1045 private boolean isInActiveDurableSub(Subscription sub) {
1046 return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isActive());
1047 }
1048
1049 private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1050 boolean suppress = false;
1051
1052 if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1053 if (LOG.isDebugEnabled()) {
1054 LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
1055 + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: "
1056 + existingSub + ", networkConsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
1057 }
1058 suppress = true;
1059 } else {
1060 // remove the existing lower priority duplicate and allow this candidate
1061 try {
1062 removeDuplicateSubscription(existingSub);
1063
1064 if (LOG.isDebugEnabled()) {
1065 LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
1066 + " with sub from " + remoteBrokerName
1067 + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
1068 + candidateInfo.getNetworkConsumerIds());
1069 }
1070 } catch (IOException e) {
1071 LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e);
1072 }
1073 }
1074 return suppress;
1075 }
1076
1077 private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1078 for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1079 if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1080 break;
1081 }
1082 }
1083 }
1084
1085 private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1086 boolean found = false;
1087 for (ConsumerId aliasConsumer : networkConsumers) {
1088 if (candidateConsumers.contains(aliasConsumer)) {
1089 found = true;
1090 break;
1091 }
1092 }
1093 return found;
1094 }
1095
1096 private final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1097 RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1098 Region region;
1099 Collection<Subscription> subs;
1100
1101 region = null;
1102 switch ( dest.getDestinationType() )
1103 {
1104 case ActiveMQDestination.QUEUE_TYPE:
1105 region = region_broker.getQueueRegion();
1106 break;
1107
1108 case ActiveMQDestination.TOPIC_TYPE:
1109 region = region_broker.getTopicRegion();
1110 break;
1111
1112 case ActiveMQDestination.TEMP_QUEUE_TYPE:
1113 region = region_broker.getTempQueueRegion();
1114 break;
1115
1116 case ActiveMQDestination.TEMP_TOPIC_TYPE:
1117 region = region_broker.getTempTopicRegion();
1118 break;
1119 }
1120
1121 if ( region instanceof AbstractRegion ) {
1122 subs = ((AbstractRegion) region).getSubscriptions().values();
1123 } else {
1124 subs = null;
1125 }
1126
1127 return subs;
1128 }
1129
1130 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1131 //add our original id to ourselves
1132 info.addNetworkConsumerId(info.getConsumerId());
1133 return doCreateDemandSubscription(info);
1134 }
1135
1136 protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1137 DemandSubscription result = new DemandSubscription(info);
1138 result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1139 if (info.getDestination().isTemporary()) {
1140 // reset the local connection Id
1141 ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1142 dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1143 }
1144
1145 if (configuration.isDecreaseNetworkConsumerPriority()) {
1146 byte priority = (byte) configuration.getConsumerPriorityBase();
1147 if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1148 // The longer the path to the consumer, the less it's consumer priority.
1149 priority -= info.getBrokerPath().length + 1;
1150 }
1151 result.getLocalInfo().setPriority(priority);
1152 if (LOG.isDebugEnabled()) {
1153 LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
1154 }
1155 }
1156 configureDemandSubscription(info, result);
1157 return result;
1158 }
1159
1160 final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1161 ConsumerInfo info = new ConsumerInfo();
1162 info.setDestination(destination);
1163
1164 // the remote info held by the DemandSubscription holds the original consumerId,
1165 // the local info get's overwritten
1166 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1167 DemandSubscription result = null;
1168 try {
1169 result = createDemandSubscription(info);
1170 } catch (IOException e) {
1171 LOG.error("Failed to create DemandSubscription ", e);
1172 }
1173 return result;
1174 }
1175
1176 protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1177 sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1178 sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1179 subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1180 subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1181
1182 sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1183 if (!info.isDurable()) {
1184 // This works for now since we use a VM connection to the local broker.
1185 // may need to change if we ever subscribe to a remote broker.
1186 sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1187 } else {
1188 // need to ack this message if it is ignored as it is durable so
1189 // we check before we send. see: suppressMessageDispatch()
1190 }
1191 }
1192
1193 protected void removeDemandSubscription(ConsumerId id) throws IOException {
1194 DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1195 if (LOG.isDebugEnabled()) {
1196 LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub);
1197 }
1198 if (sub != null) {
1199 removeSubscription(sub);
1200 if (LOG.isDebugEnabled()) {
1201 LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " : " + sub.getRemoteInfo());
1202 }
1203 }
1204 }
1205
1206 protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1207 boolean removeDone = false;
1208 DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1209 if (sub != null) {
1210 try {
1211 removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1212 removeDone = true;
1213 } catch (IOException e) {
1214 LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e);
1215 }
1216 }
1217 return removeDone;
1218 }
1219
1220 /**
1221 * Performs a timed wait on the started latch and then checks for disposed before performing
1222 * another wait each time the the started wait times out.
1223 *
1224 * @throws InterruptedException
1225 */
1226 protected void safeWaitUntilStarted() throws InterruptedException {
1227 while (!disposed.get()) {
1228 if (startedLatch.await(1, TimeUnit.SECONDS)) {
1229 return;
1230 }
1231 }
1232 }
1233
1234 protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1235 NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1236 if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1237 PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1238 if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1239 filterFactory = entry.getNetworkBridgeFilterFactory();
1240 }
1241 }
1242 return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL());
1243 }
1244
1245 protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
1246 synchronized (brokerInfoMutex) {
1247 if (remoteBrokerId != null) {
1248 if (remoteBrokerId.equals(localBrokerId)) {
1249 if (LOG.isTraceEnabled()) {
1250 LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
1251 }
1252 safeWaitUntilStarted();
1253 ServiceSupport.dispose(this);
1254 }
1255 }
1256 }
1257 }
1258
1259 protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1260 info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1261 }
1262
1263 protected void serviceRemoteBrokerInfo(Command command) throws IOException {
1264 synchronized (brokerInfoMutex) {
1265 BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
1266 remoteBrokerId = remoteBrokerInfo.getBrokerId();
1267 remoteBrokerPath[0] = remoteBrokerId;
1268 remoteBrokerName = remoteBrokerInfo.getBrokerName();
1269 if (localBrokerId != null) {
1270 if (localBrokerId.equals(remoteBrokerId)) {
1271 if (LOG.isTraceEnabled()) {
1272 LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
1273 }
1274 ServiceSupport.dispose(this);
1275 }
1276 }
1277 if (!disposed.get()) {
1278 triggerLocalStartBridge();
1279 }
1280 }
1281 }
1282
1283 protected BrokerId[] getRemoteBrokerPath() {
1284 return remoteBrokerPath;
1285 }
1286
1287 public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1288 this.networkBridgeListener = listener;
1289 }
1290
1291 private void fireBridgeFailed() {
1292 NetworkBridgeListener l = this.networkBridgeListener;
1293 if (l != null) {
1294 l.bridgeFailed();
1295 }
1296 }
1297
1298 public String getRemoteAddress() {
1299 return remoteBroker.getRemoteAddress();
1300 }
1301
1302 public String getLocalAddress() {
1303 return localBroker.getRemoteAddress();
1304 }
1305
1306 public String getRemoteBrokerName() {
1307 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1308 }
1309
1310 public String getLocalBrokerName() {
1311 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1312 }
1313
1314 public long getDequeueCounter() {
1315 return dequeueCounter.get();
1316 }
1317
1318 public long getEnqueueCounter() {
1319 return enqueueCounter.get();
1320 }
1321
1322 protected boolean isDuplex() {
1323 return configuration.isDuplex() || createdByDuplex;
1324 }
1325
1326 public ConcurrentHashMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1327 return subscriptionMapByRemoteId;
1328 }
1329
1330 public void setBrokerService(BrokerService brokerService) {
1331 this.brokerService = brokerService;
1332 this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1333 localBrokerPath[0] = localBrokerId;
1334 }
1335
1336 public void setMbeanObjectName(ObjectName objectName) {
1337 this.mbeanObjectName = objectName;
1338 }
1339
1340 public ObjectName getMbeanObjectName() {
1341 return mbeanObjectName;
1342 }
1343 }