001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.network;
018
019 import java.io.IOException;
020 import java.security.GeneralSecurityException;
021 import java.security.cert.X509Certificate;
022 import java.util.Arrays;
023 import java.util.Collection;
024 import java.util.List;
025 import java.util.Properties;
026 import java.util.concurrent.ConcurrentHashMap;
027 import java.util.concurrent.CountDownLatch;
028 import java.util.concurrent.ExecutionException;
029 import java.util.concurrent.ExecutorService;
030 import java.util.concurrent.Executors;
031 import java.util.concurrent.Future;
032 import java.util.concurrent.TimeUnit;
033 import java.util.concurrent.TimeoutException;
034 import java.util.concurrent.atomic.AtomicBoolean;
035 import java.util.concurrent.atomic.AtomicLong;
036
037 import javax.management.ObjectName;
038
039 import org.apache.activemq.DestinationDoesNotExistException;
040 import org.apache.activemq.Service;
041 import org.apache.activemq.advisory.AdvisoryBroker;
042 import org.apache.activemq.advisory.AdvisorySupport;
043 import org.apache.activemq.broker.BrokerService;
044 import org.apache.activemq.broker.BrokerServiceAware;
045 import org.apache.activemq.broker.ConnectionContext;
046 import org.apache.activemq.broker.TransportConnection;
047 import org.apache.activemq.broker.region.AbstractRegion;
048 import org.apache.activemq.broker.region.DurableTopicSubscription;
049 import org.apache.activemq.broker.region.Region;
050 import org.apache.activemq.broker.region.RegionBroker;
051 import org.apache.activemq.broker.region.Subscription;
052 import org.apache.activemq.broker.region.policy.PolicyEntry;
053 import org.apache.activemq.command.ActiveMQDestination;
054 import org.apache.activemq.command.ActiveMQMessage;
055 import org.apache.activemq.command.ActiveMQTempDestination;
056 import org.apache.activemq.command.ActiveMQTopic;
057 import org.apache.activemq.command.BrokerId;
058 import org.apache.activemq.command.BrokerInfo;
059 import org.apache.activemq.command.Command;
060 import org.apache.activemq.command.ConnectionError;
061 import org.apache.activemq.command.ConnectionId;
062 import org.apache.activemq.command.ConnectionInfo;
063 import org.apache.activemq.command.ConsumerId;
064 import org.apache.activemq.command.ConsumerInfo;
065 import org.apache.activemq.command.DataStructure;
066 import org.apache.activemq.command.DestinationInfo;
067 import org.apache.activemq.command.ExceptionResponse;
068 import org.apache.activemq.command.KeepAliveInfo;
069 import org.apache.activemq.command.Message;
070 import org.apache.activemq.command.MessageAck;
071 import org.apache.activemq.command.MessageDispatch;
072 import org.apache.activemq.command.NetworkBridgeFilter;
073 import org.apache.activemq.command.ProducerInfo;
074 import org.apache.activemq.command.RemoveInfo;
075 import org.apache.activemq.command.Response;
076 import org.apache.activemq.command.SessionInfo;
077 import org.apache.activemq.command.ShutdownInfo;
078 import org.apache.activemq.command.WireFormatInfo;
079 import org.apache.activemq.filter.DestinationFilter;
080 import org.apache.activemq.filter.MessageEvaluationContext;
081 import org.apache.activemq.security.SecurityContext;
082 import org.apache.activemq.transport.DefaultTransportListener;
083 import org.apache.activemq.transport.FutureResponse;
084 import org.apache.activemq.transport.ResponseCallback;
085 import org.apache.activemq.transport.Transport;
086 import org.apache.activemq.transport.TransportDisposedIOException;
087 import org.apache.activemq.transport.TransportFilter;
088 import org.apache.activemq.transport.tcp.SslTransport;
089 import org.apache.activemq.util.IdGenerator;
090 import org.apache.activemq.util.IntrospectionSupport;
091 import org.apache.activemq.util.LongSequenceGenerator;
092 import org.apache.activemq.util.MarshallingSupport;
093 import org.apache.activemq.util.ServiceStopper;
094 import org.apache.activemq.util.ServiceSupport;
095 import org.slf4j.Logger;
096 import org.slf4j.LoggerFactory;
097
098 /**
099 * A useful base class for implementing demand forwarding bridges.
100 */
101 public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
102 private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
103 protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
104 protected final Transport localBroker;
105 protected final Transport remoteBroker;
106 protected final IdGenerator idGenerator = new IdGenerator();
107 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
108 protected ConnectionInfo localConnectionInfo;
109 protected ConnectionInfo remoteConnectionInfo;
110 protected SessionInfo localSessionInfo;
111 protected ProducerInfo producerInfo;
112 protected String remoteBrokerName = "Unknown";
113 protected String localClientId;
114 protected ConsumerInfo demandConsumerInfo;
115 protected int demandConsumerDispatched;
116 protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
117 protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
118 protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
119 protected final AtomicBoolean disposed = new AtomicBoolean();
120 protected BrokerId localBrokerId;
121 protected ActiveMQDestination[] excludedDestinations;
122 protected ActiveMQDestination[] dynamicallyIncludedDestinations;
123 protected ActiveMQDestination[] staticallyIncludedDestinations;
124 protected ActiveMQDestination[] durableDestinations;
125 protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
126 protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
127 protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
128 protected final CountDownLatch startedLatch = new CountDownLatch(2);
129 protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
130 protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
131 protected NetworkBridgeConfiguration configuration;
132 protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
133
134 protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
135 protected BrokerId remoteBrokerId;
136
137 final AtomicLong enqueueCounter = new AtomicLong();
138 final AtomicLong dequeueCounter = new AtomicLong();
139
140 private NetworkBridgeListener networkBridgeListener;
141 private boolean createdByDuplex;
142 private BrokerInfo localBrokerInfo;
143 private BrokerInfo remoteBrokerInfo;
144
145 private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
146 private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
147
148 private final AtomicBoolean started = new AtomicBoolean();
149 private TransportConnection duplexInitiatingConnection;
150 private BrokerService brokerService = null;
151 private ObjectName mbeanObjectName;
152 private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
153 private Transport duplexInboundLocalBroker = null;
154 private ProducerInfo duplexInboundLocalProducerInfo;
155
156 public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
157 this.configuration = configuration;
158 this.localBroker = localBroker;
159 this.remoteBroker = remoteBroker;
160 }
161
162 public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
163 this.localBrokerInfo = localBrokerInfo;
164 this.remoteBrokerInfo = remoteBrokerInfo;
165 this.duplexInitiatingConnection = connection;
166 start();
167 serviceRemoteCommand(remoteBrokerInfo);
168 }
169
170 @Override
171 public void start() throws Exception {
172 if (started.compareAndSet(false, true)) {
173
174 if (brokerService == null) {
175 throw new IllegalArgumentException("BrokerService is null on " + this);
176 }
177
178 if (isDuplex()) {
179 duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
180 duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
181
182 @Override
183 public void onCommand(Object o) {
184 Command command = (Command) o;
185 serviceLocalCommand(command);
186 }
187
188 @Override
189 public void onException(IOException error) {
190 serviceLocalException(error);
191 }
192 });
193 duplexInboundLocalBroker.start();
194 }
195
196 localBroker.setTransportListener(new DefaultTransportListener() {
197
198 @Override
199 public void onCommand(Object o) {
200 Command command = (Command) o;
201 serviceLocalCommand(command);
202 }
203
204 @Override
205 public void onException(IOException error) {
206 if (!futureLocalBrokerInfo.isDone()) {
207 futureLocalBrokerInfo.cancel(true);
208 return;
209 }
210 serviceLocalException(error);
211 }
212 });
213
214 remoteBroker.setTransportListener(new DefaultTransportListener() {
215
216 @Override
217 public void onCommand(Object o) {
218 Command command = (Command) o;
219 serviceRemoteCommand(command);
220 }
221
222 @Override
223 public void onException(IOException error) {
224 if (!futureRemoteBrokerInfo.isDone()) {
225 futureRemoteBrokerInfo.cancel(true);
226 return;
227 }
228 serviceRemoteException(error);
229 }
230 });
231
232 remoteBroker.start();
233 localBroker.start();
234
235 if (!disposed.get()) {
236 try {
237 triggerStartAsyncNetworkBridgeCreation();
238 } catch (IOException e) {
239 LOG.warn("Caught exception from remote start", e);
240 }
241 } else {
242 LOG.warn("Bridge was disposed before the start() method was fully executed.");
243 throw new TransportDisposedIOException();
244 }
245 }
246 }
247
248 @Override
249 public void stop() throws Exception {
250 if (started.compareAndSet(true, false)) {
251 if (disposed.compareAndSet(false, true)) {
252 if (LOG.isDebugEnabled()) {
253 LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
254 }
255
256 futureRemoteBrokerInfo.cancel(true);
257 futureLocalBrokerInfo.cancel(true);
258
259 NetworkBridgeListener l = this.networkBridgeListener;
260 if (l != null) {
261 l.onStop(this);
262 }
263 try {
264 // local start complete
265 if (startedLatch.getCount() < 2) {
266 if (LOG.isTraceEnabled()) {
267 LOG.trace(configuration.getBrokerName() + " unregister bridge (" + this + ") to " + remoteBrokerName);
268 }
269 brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
270 brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
271 }
272
273 remoteBridgeStarted.set(false);
274 final CountDownLatch sendShutdown = new CountDownLatch(1);
275
276 brokerService.getTaskRunnerFactory().execute(new Runnable() {
277 @Override
278 public void run() {
279 try {
280 serialExecutor.shutdown();
281 if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
282 List<Runnable> pendingTasks = serialExecutor.shutdownNow();
283 if (LOG.isInfoEnabled()) {
284 LOG.info("pending tasks on stop" + pendingTasks);
285 }
286 }
287 localBroker.oneway(new ShutdownInfo());
288 remoteBroker.oneway(new ShutdownInfo());
289 } catch (Throwable e) {
290 if (LOG.isDebugEnabled()) {
291 LOG.debug("Caught exception sending shutdown", e);
292 }
293 } finally {
294 sendShutdown.countDown();
295 }
296
297 }
298 }, "ActiveMQ ForwardingBridge StopTask");
299
300 if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
301 LOG.info("Network Could not shutdown in a timely manner");
302 }
303 } finally {
304 ServiceStopper ss = new ServiceStopper();
305 ss.stop(remoteBroker);
306 ss.stop(localBroker);
307 ss.stop(duplexInboundLocalBroker);
308 // Release the started Latch since another thread could be
309 // stuck waiting for it to start up.
310 startedLatch.countDown();
311 startedLatch.countDown();
312 localStartedLatch.countDown();
313
314 ss.throwFirstException();
315 }
316 }
317
318 if (LOG.isInfoEnabled()) {
319 LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
320 }
321 }
322 }
323
324 protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
325 brokerService.getTaskRunnerFactory().execute(new Runnable() {
326 @Override
327 public void run() {
328 final String originalName = Thread.currentThread().getName();
329 Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
330 "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
331
332 try {
333 // First we collect the info data from both the local and remote ends
334 collectBrokerInfos();
335
336 // Once we have all required broker info we can attempt to start
337 // the local and then remote sides of the bridge.
338 doStartLocalAndRemoteBridges();
339 } finally {
340 Thread.currentThread().setName(originalName);
341 }
342 }
343 });
344 }
345
346 private void collectBrokerInfos() {
347
348 // First wait for the remote to feed us its BrokerInfo, then we can check on
349 // the LocalBrokerInfo and decide is this is a loop.
350 try {
351 remoteBrokerInfo = futureRemoteBrokerInfo.get();
352 if (remoteBrokerInfo == null) {
353 fireBridgeFailed();
354 }
355 } catch (Exception e) {
356 serviceRemoteException(e);
357 return;
358 }
359
360 try {
361 localBrokerInfo = futureLocalBrokerInfo.get();
362 if (localBrokerInfo == null) {
363 fireBridgeFailed();
364 }
365
366 // Before we try and build the bridge lets check if we are in a loop
367 // and if so just stop now before registering anything.
368 if (localBrokerId.equals(remoteBrokerId)) {
369 if (LOG.isTraceEnabled()) {
370 LOG.trace(configuration.getBrokerName() +
371 " disconnecting remote loop back connection for: " +
372 remoteBrokerName + ", with id:" + remoteBrokerId);
373 }
374 ServiceSupport.dispose(localBroker);
375 ServiceSupport.dispose(remoteBroker);
376 return;
377 }
378
379 // Fill in the remote broker's information now.
380 remoteBrokerId = remoteBrokerInfo.getBrokerId();
381 remoteBrokerPath[0] = remoteBrokerId;
382 remoteBrokerName = remoteBrokerInfo.getBrokerName();
383 } catch (Throwable e) {
384 serviceLocalException(e);
385 }
386 }
387
388 private void doStartLocalAndRemoteBridges() {
389
390 if (disposed.get()) {
391 return;
392 }
393
394 if (isCreatedByDuplex()) {
395 // apply remote (propagated) configuration to local duplex bridge before start
396 Properties props = null;
397 try {
398 props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
399 IntrospectionSupport.getProperties(configuration, props, null);
400 if (configuration.getExcludedDestinations() != null) {
401 excludedDestinations = configuration.getExcludedDestinations().toArray(
402 new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
403 }
404 if (configuration.getStaticallyIncludedDestinations() != null) {
405 staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
406 new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
407 }
408 if (configuration.getDynamicallyIncludedDestinations() != null) {
409 dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
410 new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
411 }
412 } catch (Throwable t) {
413 LOG.error("Error mapping remote configuration: " + props, t);
414 }
415 }
416
417 try {
418 startLocalBridge();
419 } catch (Throwable e) {
420 serviceLocalException(e);
421 return;
422 }
423
424 try {
425 startRemoteBridge();
426 } catch (Throwable e) {
427 serviceRemoteException(e);
428 }
429 }
430
431 private void startLocalBridge() throws Throwable {
432 if (localBridgeStarted.compareAndSet(false, true)) {
433 synchronized (this) {
434 if (LOG.isTraceEnabled()) {
435 LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker);
436 }
437 if (!disposed.get()) {
438 localConnectionInfo = new ConnectionInfo();
439 localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
440 localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
441 localConnectionInfo.setClientId(localClientId);
442 localConnectionInfo.setUserName(configuration.getUserName());
443 localConnectionInfo.setPassword(configuration.getPassword());
444 Transport originalTransport = remoteBroker;
445 while (originalTransport instanceof TransportFilter) {
446 originalTransport = ((TransportFilter) originalTransport).getNext();
447 }
448 if (originalTransport instanceof SslTransport) {
449 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
450 localConnectionInfo.setTransportContext(peerCerts);
451 }
452 // sync requests that may fail
453 Object resp = localBroker.request(localConnectionInfo);
454 if (resp instanceof ExceptionResponse) {
455 throw ((ExceptionResponse) resp).getException();
456 }
457 localSessionInfo = new SessionInfo(localConnectionInfo, 1);
458 localBroker.oneway(localSessionInfo);
459
460 if (configuration.isDuplex()) {
461 // separate in-bound channel for forwards so we don't
462 // contend with out-bound dispatch on same connection
463 ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
464 duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
465 duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
466 + configuration.getBrokerName());
467 duplexLocalConnectionInfo.setUserName(configuration.getUserName());
468 duplexLocalConnectionInfo.setPassword(configuration.getPassword());
469
470 if (originalTransport instanceof SslTransport) {
471 X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
472 duplexLocalConnectionInfo.setTransportContext(peerCerts);
473 }
474 // sync requests that may fail
475 resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
476 if (resp instanceof ExceptionResponse) {
477 throw ((ExceptionResponse) resp).getException();
478 }
479 SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
480 duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
481 duplexInboundLocalBroker.oneway(duplexInboundSession);
482 duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
483 }
484 brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
485 NetworkBridgeListener l = this.networkBridgeListener;
486 if (l != null) {
487 l.onStart(this);
488 }
489
490 // Let the local broker know the remote broker's ID.
491 localBroker.oneway(remoteBrokerInfo);
492 // new peer broker (a consumer can work with remote broker also)
493 brokerService.getBroker().addBroker(null, remoteBrokerInfo);
494
495 if (LOG.isInfoEnabled()) {
496 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
497 if (LOG.isTraceEnabled()) {
498 LOG.trace(configuration.getBrokerName() + " register bridge (" + this + ") to " + remoteBrokerName);
499 }
500 }
501 } else {
502 LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
503 }
504 startedLatch.countDown();
505 localStartedLatch.countDown();
506 }
507
508 if (!disposed.get()) {
509 setupStaticDestinations();
510 } else {
511 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName
512 + ") was interrupted during establishment.");
513 }
514 }
515 }
516
517 protected void startRemoteBridge() throws Exception {
518 if (remoteBridgeStarted.compareAndSet(false, true)) {
519 if (LOG.isTraceEnabled()) {
520 LOG.trace(configuration.getBrokerName() + " starting remote Bridge, remoteBroker=" + remoteBroker);
521 }
522 synchronized (this) {
523 if (!isCreatedByDuplex()) {
524 BrokerInfo brokerInfo = new BrokerInfo();
525 brokerInfo.setBrokerName(configuration.getBrokerName());
526 brokerInfo.setBrokerURL(configuration.getBrokerURL());
527 brokerInfo.setNetworkConnection(true);
528 brokerInfo.setDuplexConnection(configuration.isDuplex());
529 // set our properties
530 Properties props = new Properties();
531 IntrospectionSupport.getProperties(configuration, props, null);
532 String str = MarshallingSupport.propertiesToString(props);
533 brokerInfo.setNetworkProperties(str);
534 brokerInfo.setBrokerId(this.localBrokerId);
535 remoteBroker.oneway(brokerInfo);
536 }
537 if (remoteConnectionInfo != null) {
538 remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
539 }
540 remoteConnectionInfo = new ConnectionInfo();
541 remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
542 remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
543 remoteConnectionInfo.setUserName(configuration.getUserName());
544 remoteConnectionInfo.setPassword(configuration.getPassword());
545 remoteBroker.oneway(remoteConnectionInfo);
546
547 SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
548 remoteBroker.oneway(remoteSessionInfo);
549 producerInfo = new ProducerInfo(remoteSessionInfo, 1);
550 producerInfo.setResponseRequired(false);
551 remoteBroker.oneway(producerInfo);
552 // Listen to consumer advisory messages on the remote broker to determine demand.
553 if (!configuration.isStaticBridge()) {
554 demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
555 // always dispatch advisory message asynchronously so that
556 // we never block the producer broker if we are slow
557 demandConsumerInfo.setDispatchAsync(true);
558 String advisoryTopic = configuration.getDestinationFilter();
559 if (configuration.isBridgeTempDestinations()) {
560 advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
561 }
562 demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
563 demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
564 remoteBroker.oneway(demandConsumerInfo);
565 }
566 startedLatch.countDown();
567 }
568 }
569 }
570
571 @Override
572 public void serviceRemoteException(Throwable error) {
573 if (!disposed.get()) {
574 if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
575 LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
576 } else {
577 LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
578 }
579 LOG.debug("The remote Exception was: " + error, error);
580 brokerService.getTaskRunnerFactory().execute(new Runnable() {
581 @Override
582 public void run() {
583 ServiceSupport.dispose(getControllingService());
584 }
585 });
586 fireBridgeFailed();
587 }
588 }
589
590 protected void serviceRemoteCommand(Command command) {
591 if (!disposed.get()) {
592 try {
593 if (command.isMessageDispatch()) {
594 safeWaitUntilStarted();
595 MessageDispatch md = (MessageDispatch) command;
596 serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
597 ackAdvisory(md.getMessage());
598 } else if (command.isBrokerInfo()) {
599 futureRemoteBrokerInfo.set((BrokerInfo) command);
600 } else if (command.getClass() == ConnectionError.class) {
601 ConnectionError ce = (ConnectionError) command;
602 serviceRemoteException(ce.getException());
603 } else {
604 if (isDuplex()) {
605 if (LOG.isTraceEnabled()) {
606 LOG.trace(configuration.getBrokerName() + " duplex command type: " + command.getDataStructureType());
607 }
608 if (command.isMessage()) {
609 final ActiveMQMessage message = (ActiveMQMessage) command;
610 if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
611 || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
612 serviceRemoteConsumerAdvisory(message.getDataStructure());
613 ackAdvisory(message);
614 } else {
615 if (!isPermissableDestination(message.getDestination(), true)) {
616 return;
617 }
618 // message being forwarded - we need to
619 // propagate the response to our local send
620 message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
621 if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
622 duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
623 final int correlationId = message.getCommandId();
624
625 @Override
626 public void onCompletion(FutureResponse resp) {
627 try {
628 Response reply = resp.getResult();
629 reply.setCorrelationId(correlationId);
630 remoteBroker.oneway(reply);
631 } catch (IOException error) {
632 LOG.error("Exception: " + error + " on duplex forward of: " + message);
633 serviceRemoteException(error);
634 }
635 }
636 });
637 } else {
638 duplexInboundLocalBroker.oneway(message);
639 }
640 }
641 } else {
642 switch (command.getDataStructureType()) {
643 case ConnectionInfo.DATA_STRUCTURE_TYPE:
644 case SessionInfo.DATA_STRUCTURE_TYPE:
645 localBroker.oneway(command);
646 break;
647 case ProducerInfo.DATA_STRUCTURE_TYPE:
648 // using duplexInboundLocalProducerInfo
649 break;
650 case MessageAck.DATA_STRUCTURE_TYPE:
651 MessageAck ack = (MessageAck) command;
652 DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
653 if (localSub != null) {
654 ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
655 localBroker.oneway(ack);
656 } else {
657 LOG.warn("Matching local subscription not found for ack: " + ack);
658 }
659 break;
660 case ConsumerInfo.DATA_STRUCTURE_TYPE:
661 localStartedLatch.await();
662 if (started.get()) {
663 if (!addConsumerInfo((ConsumerInfo) command)) {
664 if (LOG.isDebugEnabled()) {
665 LOG.debug("Ignoring ConsumerInfo: " + command);
666 }
667 } else {
668 if (LOG.isTraceEnabled()) {
669 LOG.trace("Adding ConsumerInfo: " + command);
670 }
671 }
672 } else {
673 // received a subscription whilst stopping
674 LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
675 }
676 break;
677 case ShutdownInfo.DATA_STRUCTURE_TYPE:
678 // initiator is shutting down, controlled case
679 // abortive close dealt with by inactivity monitor
680 LOG.info("Stopping network bridge on shutdown of remote broker");
681 serviceRemoteException(new IOException(command.toString()));
682 break;
683 default:
684 if (LOG.isDebugEnabled()) {
685 LOG.debug("Ignoring remote command: " + command);
686 }
687 }
688 }
689 } else {
690 switch (command.getDataStructureType()) {
691 case KeepAliveInfo.DATA_STRUCTURE_TYPE:
692 case WireFormatInfo.DATA_STRUCTURE_TYPE:
693 case ShutdownInfo.DATA_STRUCTURE_TYPE:
694 break;
695 default:
696 LOG.warn("Unexpected remote command: " + command);
697 }
698 }
699 }
700 } catch (Throwable e) {
701 if (LOG.isDebugEnabled()) {
702 LOG.debug("Exception processing remote command: " + command, e);
703 }
704 serviceRemoteException(e);
705 }
706 }
707 }
708
709 private void ackAdvisory(Message message) throws IOException {
710 demandConsumerDispatched++;
711 if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
712 MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
713 ack.setConsumerId(demandConsumerInfo.getConsumerId());
714 remoteBroker.oneway(ack);
715 demandConsumerDispatched = 0;
716 }
717 }
718
719 private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
720 final int networkTTL = configuration.getNetworkTTL();
721 if (data.getClass() == ConsumerInfo.class) {
722 // Create a new local subscription
723 ConsumerInfo info = (ConsumerInfo) data;
724 BrokerId[] path = info.getBrokerPath();
725
726 if (info.isBrowser()) {
727 if (LOG.isDebugEnabled()) {
728 LOG.info(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", browsers explicitly suppressed");
729 }
730 return;
731 }
732
733 if (path != null && path.length >= networkTTL) {
734 if (LOG.isDebugEnabled()) {
735 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL
736 + " network hops only : " + info);
737 }
738 return;
739 }
740
741 if (contains(path, localBrokerPath[0])) {
742 // Ignore this consumer as it's a consumer we locally sent to the broker.
743 if (LOG.isDebugEnabled()) {
744 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info);
745 }
746 return;
747 }
748
749 if (!isPermissableDestination(info.getDestination())) {
750 // ignore if not in the permitted or in the excluded list
751 if (LOG.isDebugEnabled()) {
752 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination()
753 + " is not permiited :" + info);
754 }
755 return;
756 }
757
758 // in a cyclic network there can be multiple bridges per broker that can propagate
759 // a network subscription so there is a need to synchronize on a shared entity
760 synchronized (brokerService.getVmConnectorURI()) {
761 if (addConsumerInfo(info)) {
762 if (LOG.isDebugEnabled()) {
763 LOG.debug(configuration.getBrokerName() + " bridged sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
764 }
765 } else {
766 if (LOG.isDebugEnabled()) {
767 LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName
768 + " as already subscribed to matching destination : " + info);
769 }
770 }
771 }
772 } else if (data.getClass() == DestinationInfo.class) {
773 // It's a destination info - we want to pass up information about temporary destinations
774 final DestinationInfo destInfo = (DestinationInfo) data;
775 BrokerId[] path = destInfo.getBrokerPath();
776 if (path != null && path.length >= networkTTL) {
777 if (LOG.isDebugEnabled()) {
778 LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only");
779 }
780 return;
781 }
782 if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
783 // Ignore this consumer as it's a consumer we locally sent to the broker.
784 if (LOG.isDebugEnabled()) {
785 LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
786 }
787 return;
788 }
789 destInfo.setConnectionId(localConnectionInfo.getConnectionId());
790 if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
791 // re-set connection id so comes from here
792 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
793 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
794 }
795 destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
796 if (LOG.isTraceEnabled()) {
797 LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker
798 + " from " + remoteBrokerName + ", destination: " + destInfo);
799 }
800 if (destInfo.isRemoveOperation()) {
801 // Serialize with removeSub operations such that all removeSub advisories
802 // are generated
803 serialExecutor.execute(new Runnable() {
804 @Override
805 public void run() {
806 try {
807 localBroker.oneway(destInfo);
808 } catch (IOException e) {
809 LOG.warn("failed to deliver remove command for destination:" + destInfo.getDestination(), e);
810 }
811 }
812 });
813 } else {
814 localBroker.oneway(destInfo);
815 }
816 } else if (data.getClass() == RemoveInfo.class) {
817 ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
818 removeDemandSubscription(id);
819 }
820 }
821
822 @Override
823 public void serviceLocalException(Throwable error) {
824 serviceLocalException(null, error);
825 }
826
827 public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
828
829 if (!disposed.get()) {
830 if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
831 // not a reason to terminate the bridge - temps can disappear with
832 // pending sends as the demand sub may outlive the remote dest
833 if (messageDispatch != null) {
834 LOG.warn("PoisonAck of " + messageDispatch.getMessage().getMessageId() + " on forwarding error: " + error);
835 try {
836 MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
837 poisonAck.setPoisonCause(error);
838 localBroker.oneway(poisonAck);
839 } catch (IOException ioe) {
840 LOG.error("Failed to posion ack message following forward failure: " + ioe, ioe);
841 }
842 fireFailedForwardAdvisory(messageDispatch, error);
843 } else {
844 LOG.warn("Ignoring exception on forwarding to non existent temp dest: " + error, error);
845 }
846 return;
847 }
848
849 if (LOG.isInfoEnabled()) {
850 LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
851 }
852 if (LOG.isDebugEnabled()) {
853 LOG.debug("The local Exception was:" + error, error);
854 }
855
856 brokerService.getTaskRunnerFactory().execute(new Runnable() {
857 @Override
858 public void run() {
859 ServiceSupport.dispose(getControllingService());
860 }
861 });
862 fireBridgeFailed();
863 }
864 }
865
866 private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
867 if (configuration.isAdvisoryForFailedForward()) {
868 AdvisoryBroker advisoryBroker = null;
869 try {
870 advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
871
872 if (advisoryBroker != null) {
873 ConnectionContext context = new ConnectionContext();
874 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
875 context.setBroker(brokerService.getBroker());
876
877 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
878 advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
879 advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
880 advisoryMessage);
881
882 }
883 } catch (Exception e) {
884 LOG.warn("failed to fire forward failure advisory, cause: " + e);
885 if (LOG.isDebugEnabled()) {
886 LOG.debug("detail", e);
887 }
888 }
889 }
890 }
891
892 protected Service getControllingService() {
893 return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
894 }
895
896 protected void addSubscription(DemandSubscription sub) throws IOException {
897 if (sub != null) {
898 localBroker.oneway(sub.getLocalInfo());
899 }
900 }
901
902 protected void removeSubscription(final DemandSubscription sub) throws IOException {
903 if (sub != null) {
904 if (LOG.isTraceEnabled()) {
905 LOG.trace(configuration.getBrokerName() + " remove local subscription:" + sub.getLocalInfo().getConsumerId() + " for remote "
906 + sub.getRemoteInfo().getConsumerId());
907 }
908
909 // ensure not available for conduit subs pending removal
910 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
911 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
912
913 // continue removal in separate thread to free up this thread for outstanding responses
914 // Serialize with removeDestination operations so that removeSubs are serialized with
915 // removeDestinations such that all removeSub advisories are generated
916 serialExecutor.execute(new Runnable() {
917 @Override
918 public void run() {
919 sub.waitForCompletion();
920 try {
921 localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
922 } catch (IOException e) {
923 LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e);
924 }
925 }
926 });
927 }
928 }
929
930 protected Message configureMessage(MessageDispatch md) throws IOException {
931 Message message = md.getMessage().copy();
932 // Update the packet to show where it came from.
933 message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
934 message.setProducerId(producerInfo.getProducerId());
935 message.setDestination(md.getDestination());
936 message.setMemoryUsage(null);
937 if (message.getOriginalTransactionId() == null) {
938 message.setOriginalTransactionId(message.getTransactionId());
939 }
940 message.setTransactionId(null);
941 if (configuration.isUseCompression()) {
942 message.compress();
943 }
944 return message;
945 }
946
947 protected void serviceLocalCommand(Command command) {
948 if (!disposed.get()) {
949 try {
950 if (command.isMessageDispatch()) {
951 safeWaitUntilStarted();
952 enqueueCounter.incrementAndGet();
953 final MessageDispatch md = (MessageDispatch) command;
954 final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
955 if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
956
957 if (suppressMessageDispatch(md, sub)) {
958 if (LOG.isDebugEnabled()) {
959 LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName
960 + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath())
961 + ", message: " + md.getMessage());
962 }
963 // still ack as it may be durable
964 try {
965 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
966 } finally {
967 sub.decrementOutstandingResponses();
968 }
969 return;
970 }
971
972 Message message = configureMessage(md);
973 if (LOG.isDebugEnabled()) {
974 LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") "
975 + (LOG.isTraceEnabled() ? message : message.getMessageId()) + ", consumer: " + md.getConsumerId() + ", destination "
976 + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
977 }
978
979 if (isDuplex() && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
980 try {
981 // never request b/c they are eventually acked async
982 remoteBroker.oneway(message);
983 } finally {
984 sub.decrementOutstandingResponses();
985 }
986 return;
987 }
988
989 if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
990
991 // The message was not sent using async send, so we should only
992 // ack the local broker when we get confirmation that the remote
993 // broker has received the message.
994 remoteBroker.asyncRequest(message, new ResponseCallback() {
995 @Override
996 public void onCompletion(FutureResponse future) {
997 try {
998 Response response = future.getResult();
999 if (response.isException()) {
1000 ExceptionResponse er = (ExceptionResponse) response;
1001 serviceLocalException(md, er.getException());
1002 } else {
1003 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1004 dequeueCounter.incrementAndGet();
1005 }
1006 } catch (IOException e) {
1007 serviceLocalException(md, e);
1008 } finally {
1009 sub.decrementOutstandingResponses();
1010 }
1011 }
1012 });
1013
1014 } else {
1015 // If the message was originally sent using async send, we will
1016 // preserve that QOS by bridging it using an async send (small chance
1017 // of message loss).
1018 try {
1019 remoteBroker.oneway(message);
1020 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1021 dequeueCounter.incrementAndGet();
1022 } finally {
1023 sub.decrementOutstandingResponses();
1024 }
1025 }
1026 } else {
1027 if (LOG.isDebugEnabled()) {
1028 LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: "
1029 + md.getMessage());
1030 }
1031 }
1032 } else if (command.isBrokerInfo()) {
1033 futureLocalBrokerInfo.set((BrokerInfo) command);
1034 } else if (command.isShutdownInfo()) {
1035 LOG.info(configuration.getBrokerName() + " Shutting down");
1036 stop();
1037 } else if (command.getClass() == ConnectionError.class) {
1038 ConnectionError ce = (ConnectionError) command;
1039 serviceLocalException(ce.getException());
1040 } else {
1041 switch (command.getDataStructureType()) {
1042 case WireFormatInfo.DATA_STRUCTURE_TYPE:
1043 break;
1044 default:
1045 LOG.warn("Unexpected local command: " + command);
1046 }
1047 }
1048 } catch (Throwable e) {
1049 LOG.warn("Caught an exception processing local command", e);
1050 serviceLocalException(e);
1051 }
1052 }
1053 }
1054
1055 private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
1056 boolean suppress = false;
1057 // for durable subs, suppression via filter leaves dangling acks so we
1058 // need to check here and allow the ack irrespective
1059 if (sub.getLocalInfo().isDurable()) {
1060 MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
1061 messageEvalContext.setMessageReference(md.getMessage());
1062 messageEvalContext.setDestination(md.getDestination());
1063 suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
1064 }
1065 return suppress;
1066 }
1067
1068 public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
1069 if (brokerPath != null) {
1070 for (BrokerId id : brokerPath) {
1071 if (brokerId.equals(id)) {
1072 return true;
1073 }
1074 }
1075 }
1076 return false;
1077 }
1078
1079 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
1080 if (brokerPath == null || brokerPath.length == 0) {
1081 return pathsToAppend;
1082 }
1083 BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
1084 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1085 System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
1086 return rc;
1087 }
1088
1089 protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
1090 if (brokerPath == null || brokerPath.length == 0) {
1091 return new BrokerId[] { idToAppend };
1092 }
1093 BrokerId rc[] = new BrokerId[brokerPath.length + 1];
1094 System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1095 rc[brokerPath.length] = idToAppend;
1096 return rc;
1097 }
1098
1099 protected boolean isPermissableDestination(ActiveMQDestination destination) {
1100 return isPermissableDestination(destination, false);
1101 }
1102
1103 protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
1104 // Are we not bridging temporary destinations?
1105 if (destination.isTemporary()) {
1106 if (allowTemporary) {
1107 return true;
1108 } else {
1109 return configuration.isBridgeTempDestinations();
1110 }
1111 }
1112
1113 ActiveMQDestination[] dests = staticallyIncludedDestinations;
1114 if (dests != null && dests.length > 0) {
1115 for (ActiveMQDestination dest : dests) {
1116 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1117 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1118 return true;
1119 }
1120 }
1121 }
1122
1123 dests = excludedDestinations;
1124 if (dests != null && dests.length > 0) {
1125 for (ActiveMQDestination dest : dests) {
1126 DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
1127 if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1128 return false;
1129 }
1130 }
1131 }
1132
1133 dests = dynamicallyIncludedDestinations;
1134 if (dests != null && dests.length > 0) {
1135 for (ActiveMQDestination dest : dests) {
1136 DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1137 if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1138 return true;
1139 }
1140 }
1141
1142 return false;
1143 }
1144 return true;
1145 }
1146
1147 /**
1148 * Subscriptions for these destinations are always created
1149 */
1150 protected void setupStaticDestinations() {
1151 ActiveMQDestination[] dests = staticallyIncludedDestinations;
1152 if (dests != null) {
1153 for (ActiveMQDestination dest : dests) {
1154 DemandSubscription sub = createDemandSubscription(dest);
1155 try {
1156 addSubscription(sub);
1157 } catch (IOException e) {
1158 LOG.error("Failed to add static destination " + dest, e);
1159 }
1160 if (LOG.isTraceEnabled()) {
1161 LOG.trace(configuration.getBrokerName() + ", bridging messages for static destination: " + dest);
1162 }
1163 }
1164 }
1165 }
1166
1167 protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1168 boolean consumerAdded = false;
1169 ConsumerInfo info = consumerInfo.copy();
1170 addRemoteBrokerToBrokerPath(info);
1171 DemandSubscription sub = createDemandSubscription(info);
1172 if (sub != null) {
1173 if (duplicateSuppressionIsRequired(sub)) {
1174 undoMapRegistration(sub);
1175 } else {
1176 addSubscription(sub);
1177 consumerAdded = true;
1178 }
1179 }
1180 return consumerAdded;
1181 }
1182
1183 private void undoMapRegistration(DemandSubscription sub) {
1184 subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1185 subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1186 }
1187
1188 /*
1189 * check our existing subs networkConsumerIds against the list of network
1190 * ids in this subscription A match means a duplicate which we suppress for
1191 * topics and maybe for queues
1192 */
1193 private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1194 final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1195 boolean suppress = false;
1196
1197 if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic()
1198 && !configuration.isSuppressDuplicateTopicSubscriptions()) {
1199 return suppress;
1200 }
1201
1202 List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1203 Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
1204 for (Subscription sub : currentSubs) {
1205 List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1206 if (!networkConsumers.isEmpty()) {
1207 if (matchFound(candidateConsumers, networkConsumers)) {
1208 if (isInActiveDurableSub(sub)) {
1209 suppress = false;
1210 } else {
1211 suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1212 }
1213 break;
1214 }
1215 }
1216 }
1217 return suppress;
1218 }
1219
1220 private boolean isInActiveDurableSub(Subscription sub) {
1221 return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
1222 }
1223
1224 private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1225 boolean suppress = false;
1226
1227 if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1228 if (LOG.isDebugEnabled()) {
1229 LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName + ", sub: " + candidateInfo
1230 + " is duplicated by network subscription with equal or higher network priority: " + existingSub + ", networkConsumerIds: "
1231 + existingSub.getConsumerInfo().getNetworkConsumerIds());
1232 }
1233 suppress = true;
1234 } else {
1235 // remove the existing lower priority duplicate and allow this candidate
1236 try {
1237 removeDuplicateSubscription(existingSub);
1238
1239 if (LOG.isDebugEnabled()) {
1240 LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo() + " with sub from "
1241 + remoteBrokerName + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
1242 + candidateInfo.getNetworkConsumerIds());
1243 }
1244 } catch (IOException e) {
1245 LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e);
1246 }
1247 }
1248 return suppress;
1249 }
1250
1251 private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1252 for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1253 if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1254 break;
1255 }
1256 }
1257 }
1258
1259 private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1260 boolean found = false;
1261 for (ConsumerId aliasConsumer : networkConsumers) {
1262 if (candidateConsumers.contains(aliasConsumer)) {
1263 found = true;
1264 break;
1265 }
1266 }
1267 return found;
1268 }
1269
1270 private final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1271 RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1272 Region region;
1273 Collection<Subscription> subs;
1274
1275 region = null;
1276 switch (dest.getDestinationType()) {
1277 case ActiveMQDestination.QUEUE_TYPE:
1278 region = region_broker.getQueueRegion();
1279 break;
1280 case ActiveMQDestination.TOPIC_TYPE:
1281 region = region_broker.getTopicRegion();
1282 break;
1283 case ActiveMQDestination.TEMP_QUEUE_TYPE:
1284 region = region_broker.getTempQueueRegion();
1285 break;
1286 case ActiveMQDestination.TEMP_TOPIC_TYPE:
1287 region = region_broker.getTempTopicRegion();
1288 break;
1289 }
1290
1291 if (region instanceof AbstractRegion) {
1292 subs = ((AbstractRegion) region).getSubscriptions().values();
1293 } else {
1294 subs = null;
1295 }
1296
1297 return subs;
1298 }
1299
1300 protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1301 // add our original id to ourselves
1302 info.addNetworkConsumerId(info.getConsumerId());
1303 return doCreateDemandSubscription(info);
1304 }
1305
1306 protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1307 DemandSubscription result = new DemandSubscription(info);
1308 result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1309 if (info.getDestination().isTemporary()) {
1310 // reset the local connection Id
1311 ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1312 dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1313 }
1314
1315 if (configuration.isDecreaseNetworkConsumerPriority()) {
1316 byte priority = (byte) configuration.getConsumerPriorityBase();
1317 if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1318 // The longer the path to the consumer, the less it's consumer priority.
1319 priority -= info.getBrokerPath().length + 1;
1320 }
1321 result.getLocalInfo().setPriority(priority);
1322 if (LOG.isDebugEnabled()) {
1323 LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
1324 }
1325 }
1326 configureDemandSubscription(info, result);
1327 return result;
1328 }
1329
1330 final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1331 ConsumerInfo info = new ConsumerInfo();
1332 info.setNetworkSubscription(true);
1333 info.setDestination(destination);
1334
1335 // Indicate that this subscription is being made on behalf of the remote broker.
1336 info.setBrokerPath(new BrokerId[] { remoteBrokerId });
1337
1338 // the remote info held by the DemandSubscription holds the original
1339 // consumerId, the local info get's overwritten
1340 info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1341 DemandSubscription result = null;
1342 try {
1343 result = createDemandSubscription(info);
1344 } catch (IOException e) {
1345 LOG.error("Failed to create DemandSubscription ", e);
1346 }
1347 return result;
1348 }
1349
1350 protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1351 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
1352 sub.getLocalInfo().setDispatchAsync(true);
1353 } else {
1354 sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1355 }
1356 sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1357 subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1358 subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1359
1360 sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1361 if (!info.isDurable()) {
1362 // This works for now since we use a VM connection to the local broker.
1363 // may need to change if we ever subscribe to a remote broker.
1364 sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1365 } else {
1366 // need to ack this message if it is ignored as it is durable so
1367 // we check before we send. see: suppressMessageDispatch()
1368 }
1369 }
1370
1371 protected void removeDemandSubscription(ConsumerId id) throws IOException {
1372 DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1373 if (LOG.isDebugEnabled()) {
1374 LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id
1375 + ", matching sub: " + sub);
1376 }
1377 if (sub != null) {
1378 removeSubscription(sub);
1379 if (LOG.isDebugEnabled()) {
1380 LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " : " + sub.getRemoteInfo());
1381 }
1382 }
1383 }
1384
1385 protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1386 boolean removeDone = false;
1387 DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1388 if (sub != null) {
1389 try {
1390 removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1391 removeDone = true;
1392 } catch (IOException e) {
1393 LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e);
1394 }
1395 }
1396 return removeDone;
1397 }
1398
1399 /**
1400 * Performs a timed wait on the started latch and then checks for disposed
1401 * before performing another wait each time the the started wait times out.
1402 *
1403 * @throws InterruptedException
1404 */
1405 protected void safeWaitUntilStarted() throws InterruptedException {
1406 while (!disposed.get()) {
1407 if (startedLatch.await(1, TimeUnit.SECONDS)) {
1408 return;
1409 }
1410 }
1411 }
1412
1413 protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1414 NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1415 if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1416 PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1417 if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1418 filterFactory = entry.getNetworkBridgeFilterFactory();
1419 }
1420 }
1421 return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL());
1422 }
1423
1424 protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1425 info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1426 }
1427
1428 protected BrokerId[] getRemoteBrokerPath() {
1429 return remoteBrokerPath;
1430 }
1431
1432 @Override
1433 public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1434 this.networkBridgeListener = listener;
1435 }
1436
1437 private void fireBridgeFailed() {
1438 NetworkBridgeListener l = this.networkBridgeListener;
1439 if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
1440 l.bridgeFailed();
1441 }
1442 }
1443
1444 /**
1445 * @return Returns the dynamicallyIncludedDestinations.
1446 */
1447 public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
1448 return dynamicallyIncludedDestinations;
1449 }
1450
1451 /**
1452 * @param dynamicallyIncludedDestinations
1453 * The dynamicallyIncludedDestinations to set.
1454 */
1455 public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
1456 this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
1457 }
1458
1459 /**
1460 * @return Returns the excludedDestinations.
1461 */
1462 public ActiveMQDestination[] getExcludedDestinations() {
1463 return excludedDestinations;
1464 }
1465
1466 /**
1467 * @param excludedDestinations
1468 * The excludedDestinations to set.
1469 */
1470 public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
1471 this.excludedDestinations = excludedDestinations;
1472 }
1473
1474 /**
1475 * @return Returns the staticallyIncludedDestinations.
1476 */
1477 public ActiveMQDestination[] getStaticallyIncludedDestinations() {
1478 return staticallyIncludedDestinations;
1479 }
1480
1481 /**
1482 * @param staticallyIncludedDestinations
1483 * The staticallyIncludedDestinations to set.
1484 */
1485 public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
1486 this.staticallyIncludedDestinations = staticallyIncludedDestinations;
1487 }
1488
1489 /**
1490 * @return Returns the durableDestinations.
1491 */
1492 public ActiveMQDestination[] getDurableDestinations() {
1493 return durableDestinations;
1494 }
1495
1496 /**
1497 * @param durableDestinations
1498 * The durableDestinations to set.
1499 */
1500 public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
1501 this.durableDestinations = durableDestinations;
1502 }
1503
1504 /**
1505 * @return Returns the localBroker.
1506 */
1507 public Transport getLocalBroker() {
1508 return localBroker;
1509 }
1510
1511 /**
1512 * @return Returns the remoteBroker.
1513 */
1514 public Transport getRemoteBroker() {
1515 return remoteBroker;
1516 }
1517
1518 /**
1519 * @return the createdByDuplex
1520 */
1521 public boolean isCreatedByDuplex() {
1522 return this.createdByDuplex;
1523 }
1524
1525 /**
1526 * @param createdByDuplex
1527 * the createdByDuplex to set
1528 */
1529 public void setCreatedByDuplex(boolean createdByDuplex) {
1530 this.createdByDuplex = createdByDuplex;
1531 }
1532
1533 @Override
1534 public String getRemoteAddress() {
1535 return remoteBroker.getRemoteAddress();
1536 }
1537
1538 @Override
1539 public String getLocalAddress() {
1540 return localBroker.getRemoteAddress();
1541 }
1542
1543 @Override
1544 public String getRemoteBrokerName() {
1545 return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1546 }
1547
1548 @Override
1549 public String getLocalBrokerName() {
1550 return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1551 }
1552
1553 @Override
1554 public long getDequeueCounter() {
1555 return dequeueCounter.get();
1556 }
1557
1558 @Override
1559 public long getEnqueueCounter() {
1560 return enqueueCounter.get();
1561 }
1562
1563 protected boolean isDuplex() {
1564 return configuration.isDuplex() || createdByDuplex;
1565 }
1566
1567 public ConcurrentHashMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1568 return subscriptionMapByRemoteId;
1569 }
1570
1571 @Override
1572 public void setBrokerService(BrokerService brokerService) {
1573 this.brokerService = brokerService;
1574 this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1575 localBrokerPath[0] = localBrokerId;
1576 }
1577
1578 @Override
1579 public void setMbeanObjectName(ObjectName objectName) {
1580 this.mbeanObjectName = objectName;
1581 }
1582
1583 @Override
1584 public ObjectName getMbeanObjectName() {
1585 return mbeanObjectName;
1586 }
1587
1588 /*
1589 * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
1590 * remote sides of the network bridge.
1591 */
1592 private static class FutureBrokerInfo implements Future<BrokerInfo> {
1593
1594 private final CountDownLatch slot = new CountDownLatch(1);
1595 private final AtomicBoolean disposed;
1596 private BrokerInfo info = null;
1597
1598 public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
1599 this.info = info;
1600 this.disposed = disposed;
1601 }
1602
1603 @Override
1604 public boolean cancel(boolean mayInterruptIfRunning) {
1605 slot.countDown();
1606 return true;
1607 }
1608
1609 @Override
1610 public boolean isCancelled() {
1611 return slot.getCount() == 0 && info == null;
1612 }
1613
1614 @Override
1615 public boolean isDone() {
1616 return info != null;
1617 }
1618
1619 @Override
1620 public BrokerInfo get() throws InterruptedException, ExecutionException {
1621 try {
1622 if (info == null) {
1623 while (!disposed.get()) {
1624 if (slot.await(1, TimeUnit.SECONDS)) {
1625 break;
1626 }
1627 }
1628 }
1629 return info;
1630 } catch (InterruptedException e) {
1631 Thread.currentThread().interrupt();
1632 if (LOG.isDebugEnabled()) {
1633 LOG.debug("Operation interupted: " + e, e);
1634 }
1635 throw new InterruptedException("Interrupted.");
1636 }
1637 }
1638
1639 @Override
1640 public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1641 try {
1642 if (info == null) {
1643 long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
1644
1645 while (!disposed.get() || System.currentTimeMillis() < deadline) {
1646 if (slot.await(1, TimeUnit.MILLISECONDS)) {
1647 break;
1648 }
1649 }
1650 if (info == null) {
1651 throw new TimeoutException();
1652 }
1653 }
1654 return info;
1655 } catch (InterruptedException e) {
1656 throw new InterruptedException("Interrupted.");
1657 }
1658 }
1659
1660 public void set(BrokerInfo info) {
1661 this.info = info;
1662 this.slot.countDown();
1663 }
1664 }
1665
1666 }