001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.security.GeneralSecurityException;
021import java.security.cert.X509Certificate;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Properties;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.Future;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import java.util.concurrent.atomic.AtomicBoolean;
039
040import javax.management.ObjectName;
041
042import org.apache.activemq.DestinationDoesNotExistException;
043import org.apache.activemq.Service;
044import org.apache.activemq.advisory.AdvisoryBroker;
045import org.apache.activemq.advisory.AdvisorySupport;
046import org.apache.activemq.broker.BrokerService;
047import org.apache.activemq.broker.BrokerServiceAware;
048import org.apache.activemq.broker.ConnectionContext;
049import org.apache.activemq.broker.TransportConnection;
050import org.apache.activemq.broker.region.AbstractRegion;
051import org.apache.activemq.broker.region.DurableTopicSubscription;
052import org.apache.activemq.broker.region.Region;
053import org.apache.activemq.broker.region.RegionBroker;
054import org.apache.activemq.broker.region.Subscription;
055import org.apache.activemq.broker.region.policy.PolicyEntry;
056import org.apache.activemq.command.ActiveMQDestination;
057import org.apache.activemq.command.ActiveMQMessage;
058import org.apache.activemq.command.ActiveMQTempDestination;
059import org.apache.activemq.command.ActiveMQTopic;
060import org.apache.activemq.command.BrokerId;
061import org.apache.activemq.command.BrokerInfo;
062import org.apache.activemq.command.BrokerSubscriptionInfo;
063import org.apache.activemq.command.Command;
064import org.apache.activemq.command.CommandTypes;
065import org.apache.activemq.command.ConnectionError;
066import org.apache.activemq.command.ConnectionId;
067import org.apache.activemq.command.ConnectionInfo;
068import org.apache.activemq.command.ConsumerId;
069import org.apache.activemq.command.ConsumerInfo;
070import org.apache.activemq.command.DataStructure;
071import org.apache.activemq.command.DestinationInfo;
072import org.apache.activemq.command.ExceptionResponse;
073import org.apache.activemq.command.KeepAliveInfo;
074import org.apache.activemq.command.Message;
075import org.apache.activemq.command.MessageAck;
076import org.apache.activemq.command.MessageDispatch;
077import org.apache.activemq.command.MessageId;
078import org.apache.activemq.command.NetworkBridgeFilter;
079import org.apache.activemq.command.ProducerInfo;
080import org.apache.activemq.command.RemoveInfo;
081import org.apache.activemq.command.RemoveSubscriptionInfo;
082import org.apache.activemq.command.Response;
083import org.apache.activemq.command.SessionInfo;
084import org.apache.activemq.command.ShutdownInfo;
085import org.apache.activemq.command.SubscriptionInfo;
086import org.apache.activemq.command.WireFormatInfo;
087import org.apache.activemq.filter.DestinationFilter;
088import org.apache.activemq.filter.MessageEvaluationContext;
089import org.apache.activemq.security.SecurityContext;
090import org.apache.activemq.transport.DefaultTransportListener;
091import org.apache.activemq.transport.FutureResponse;
092import org.apache.activemq.transport.ResponseCallback;
093import org.apache.activemq.transport.Transport;
094import org.apache.activemq.transport.TransportDisposedIOException;
095import org.apache.activemq.transport.TransportFilter;
096import org.apache.activemq.transport.tcp.SslTransport;
097import org.apache.activemq.transport.tcp.TcpTransport;
098import org.apache.activemq.util.IdGenerator;
099import org.apache.activemq.util.IntrospectionSupport;
100import org.apache.activemq.util.LongSequenceGenerator;
101import org.apache.activemq.util.MarshallingSupport;
102import org.apache.activemq.util.NetworkBridgeUtils;
103import org.apache.activemq.util.ServiceStopper;
104import org.apache.activemq.util.ServiceSupport;
105import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
106import org.slf4j.Logger;
107import org.slf4j.LoggerFactory;
108
109/**
110 * A useful base class for implementing demand forwarding bridges.
111 */
112public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
113    private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
114    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
115    protected final Transport localBroker;
116    protected final Transport remoteBroker;
117    protected IdGenerator idGenerator = new IdGenerator();
118    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
119    protected ConnectionInfo localConnectionInfo;
120    protected ConnectionInfo remoteConnectionInfo;
121    protected SessionInfo localSessionInfo;
122    protected ProducerInfo producerInfo;
123    protected String remoteBrokerName = "Unknown";
124    protected String localClientId;
125    protected ConsumerInfo demandConsumerInfo;
126    protected int demandConsumerDispatched;
127    protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
128    protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
129    protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
130    protected final AtomicBoolean disposed = new AtomicBoolean();
131    protected BrokerId localBrokerId;
132    protected ActiveMQDestination[] excludedDestinations;
133    protected ActiveMQDestination[] dynamicallyIncludedDestinations;
134    protected ActiveMQDestination[] staticallyIncludedDestinations;
135    protected ActiveMQDestination[] durableDestinations;
136    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>();
137    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>();
138    protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>());
139    protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
140    protected final CountDownLatch startedLatch = new CountDownLatch(2);
141    protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
142    protected final CountDownLatch staticDestinationsLatch = new CountDownLatch(1);
143    protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
144    protected NetworkBridgeConfiguration configuration;
145    protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
146
147    protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
148    protected BrokerId remoteBrokerId;
149
150    protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics();
151
152    private NetworkBridgeListener networkBridgeListener;
153    private boolean createdByDuplex;
154    private BrokerInfo localBrokerInfo;
155    private BrokerInfo remoteBrokerInfo;
156
157    private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
158    private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
159
160    private final AtomicBoolean started = new AtomicBoolean();
161    private TransportConnection duplexInitiatingConnection;
162    private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean();
163    protected BrokerService brokerService = null;
164    private ObjectName mbeanObjectName;
165    private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
166    //Use a new executor for processing BrokerSubscriptionInfo so we don't block other threads
167    private final ExecutorService syncExecutor = Executors.newSingleThreadExecutor();
168    private Transport duplexInboundLocalBroker = null;
169    private ProducerInfo duplexInboundLocalProducerInfo;
170
171    public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
172        this.configuration = configuration;
173        this.localBroker = localBroker;
174        this.remoteBroker = remoteBroker;
175    }
176
177    public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
178        this.localBrokerInfo = localBrokerInfo;
179        this.remoteBrokerInfo = remoteBrokerInfo;
180        this.duplexInitiatingConnection = connection;
181        start();
182        serviceRemoteCommand(remoteBrokerInfo);
183    }
184
185    @Override
186    public void start() throws Exception {
187        if (started.compareAndSet(false, true)) {
188
189            if (brokerService == null) {
190                throw new IllegalArgumentException("BrokerService is null on " + this);
191            }
192
193            networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
194
195            if (isDuplex()) {
196                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
197                duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
198
199                    @Override
200                    public void onCommand(Object o) {
201                        Command command = (Command) o;
202                        serviceLocalCommand(command);
203                    }
204
205                    @Override
206                    public void onException(IOException error) {
207                        serviceLocalException(error);
208                    }
209                });
210                duplexInboundLocalBroker.start();
211            }
212
213            localBroker.setTransportListener(new DefaultTransportListener() {
214
215                @Override
216                public void onCommand(Object o) {
217                    Command command = (Command) o;
218                    serviceLocalCommand(command);
219                }
220
221                @Override
222                public void onException(IOException error) {
223                    if (!futureLocalBrokerInfo.isDone()) {
224                        futureLocalBrokerInfo.cancel(true);
225                        return;
226                    }
227                    serviceLocalException(error);
228                }
229            });
230
231            remoteBroker.setTransportListener(new DefaultTransportListener() {
232
233                @Override
234                public void onCommand(Object o) {
235                    Command command = (Command) o;
236                    serviceRemoteCommand(command);
237                }
238
239                @Override
240                public void onException(IOException error) {
241                    if (!futureRemoteBrokerInfo.isDone()) {
242                        futureRemoteBrokerInfo.cancel(true);
243                        return;
244                    }
245                    serviceRemoteException(error);
246                }
247            });
248
249            remoteBroker.start();
250            localBroker.start();
251
252            if (!disposed.get()) {
253                try {
254                    triggerStartAsyncNetworkBridgeCreation();
255                } catch (IOException e) {
256                    LOG.warn("Caught exception from remote start", e);
257                }
258            } else {
259                LOG.warn("Bridge was disposed before the start() method was fully executed.");
260                throw new TransportDisposedIOException();
261            }
262        }
263    }
264
265    @Override
266    public void stop() throws Exception {
267        if (started.compareAndSet(true, false)) {
268            if (disposed.compareAndSet(false, true)) {
269                LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName);
270
271                futureRemoteBrokerInfo.cancel(true);
272                futureLocalBrokerInfo.cancel(true);
273
274                NetworkBridgeListener l = this.networkBridgeListener;
275                if (l != null) {
276                    l.onStop(this);
277                }
278                try {
279                    // local start complete
280                    if (startedLatch.getCount() < 2) {
281                        LOG.trace("{} unregister bridge ({}) to {}", new Object[]{
282                                configuration.getBrokerName(), this, remoteBrokerName
283                        });
284                        brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
285                        brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
286                    }
287
288                    remoteBridgeStarted.set(false);
289                    final CountDownLatch sendShutdown = new CountDownLatch(1);
290
291                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
292                        @Override
293                        public void run() {
294                            try {
295                                serialExecutor.shutdown();
296                                if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
297                                    List<Runnable> pendingTasks = serialExecutor.shutdownNow();
298                                    LOG.info("pending tasks on stop {}", pendingTasks);
299                                }
300                                //Shutdown the syncExecutor, call countDown to make sure a thread can
301                                //terminate if it is waiting
302                                staticDestinationsLatch.countDown();
303                                syncExecutor.shutdown();
304                                if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
305                                    List<Runnable> pendingTasks = syncExecutor.shutdownNow();
306                                    LOG.info("pending tasks on stop {}", pendingTasks);
307                                }
308                                localBroker.oneway(new ShutdownInfo());
309                                remoteBroker.oneway(new ShutdownInfo());
310                            } catch (Throwable e) {
311                                LOG.debug("Caught exception sending shutdown", e);
312                            } finally {
313                                sendShutdown.countDown();
314                            }
315
316                        }
317                    }, "ActiveMQ ForwardingBridge StopTask");
318
319                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
320                        LOG.info("Network Could not shutdown in a timely manner");
321                    }
322                } finally {
323                    ServiceStopper ss = new ServiceStopper();
324                    ss.stop(remoteBroker);
325                    ss.stop(localBroker);
326                    ss.stop(duplexInboundLocalBroker);
327                    // Release the started Latch since another thread could be
328                    // stuck waiting for it to start up.
329                    startedLatch.countDown();
330                    startedLatch.countDown();
331                    localStartedLatch.countDown();
332                    staticDestinationsLatch.countDown();
333
334                    ss.throwFirstException();
335                }
336            }
337
338            LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName);
339        }
340    }
341
342    protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
343        brokerService.getTaskRunnerFactory().execute(new Runnable() {
344            @Override
345            public void run() {
346                final String originalName = Thread.currentThread().getName();
347                Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
348                        "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
349
350                try {
351                    // First we collect the info data from both the local and remote ends
352                    collectBrokerInfos();
353
354                    // Once we have all required broker info we can attempt to start
355                    // the local and then remote sides of the bridge.
356                    doStartLocalAndRemoteBridges();
357                } finally {
358                    Thread.currentThread().setName(originalName);
359                }
360            }
361        });
362    }
363
364    private void collectBrokerInfos() {
365        int timeout = 30000;
366        TcpTransport tcpTransport = remoteBroker.narrow(TcpTransport.class);
367        if (tcpTransport != null) {
368           timeout = tcpTransport.getConnectionTimeout();
369        }
370
371        // First wait for the remote to feed us its BrokerInfo, then we can check on
372        // the LocalBrokerInfo and decide is this is a loop.
373        try {
374            remoteBrokerInfo = futureRemoteBrokerInfo.get(timeout, TimeUnit.MILLISECONDS);
375            if (remoteBrokerInfo == null) {
376                serviceLocalException(new Throwable("remoteBrokerInfo is null"));
377                return;
378            }
379        } catch (Exception e) {
380            serviceRemoteException(e);
381            return;
382        }
383
384        try {
385            localBrokerInfo = futureLocalBrokerInfo.get(timeout, TimeUnit.MILLISECONDS);
386            if (localBrokerInfo == null) {
387                serviceLocalException(new Throwable("localBrokerInfo is null"));
388                return;
389            }
390
391            // Before we try and build the bridge lets check if we are in a loop
392            // and if so just stop now before registering anything.
393            remoteBrokerId = remoteBrokerInfo.getBrokerId();
394            if (localBrokerId.equals(remoteBrokerId)) {
395                LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
396                        configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
397                });
398                ServiceSupport.dispose(localBroker);
399                ServiceSupport.dispose(remoteBroker);
400                // the bridge is left in a bit of limbo, but it won't get retried
401                // in this state.
402                return;
403            }
404
405            // Fill in the remote broker's information now.
406            remoteBrokerPath[0] = remoteBrokerId;
407            remoteBrokerName = remoteBrokerInfo.getBrokerName();
408            if (configuration.isUseBrokerNamesAsIdSeed()) {
409                idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName);
410            }
411        } catch (Throwable e) {
412            serviceLocalException(e);
413        }
414    }
415
416    private void doStartLocalAndRemoteBridges() {
417
418        if (disposed.get()) {
419            return;
420        }
421
422        if (isCreatedByDuplex()) {
423            // apply remote (propagated) configuration to local duplex bridge before start
424            Properties props = null;
425            try {
426                props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
427                IntrospectionSupport.getProperties(configuration, props, null);
428                if (configuration.getExcludedDestinations() != null) {
429                    excludedDestinations = configuration.getExcludedDestinations().toArray(
430                            new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
431                }
432                if (configuration.getStaticallyIncludedDestinations() != null) {
433                    staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
434                            new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
435                }
436                if (configuration.getDynamicallyIncludedDestinations() != null) {
437                    dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
438                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
439                }
440            } catch (Throwable t) {
441                LOG.error("Error mapping remote configuration: {}", props, t);
442            }
443        }
444
445        try {
446            startLocalBridge();
447        } catch (Throwable e) {
448            serviceLocalException(e);
449            return;
450        }
451
452        try {
453            startRemoteBridge();
454        } catch (Throwable e) {
455            serviceRemoteException(e);
456            return;
457        }
458
459        try {
460            if (safeWaitUntilStarted()) {
461                setupStaticDestinations();
462                staticDestinationsLatch.countDown();
463            }
464        } catch (Throwable e) {
465            serviceLocalException(e);
466        }
467    }
468
469    private void startLocalBridge() throws Throwable {
470        if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) {
471            synchronized (this) {
472                LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker);
473                if (!disposed.get()) {
474
475                    if (idGenerator == null) {
476                        throw new IllegalStateException("Id Generator cannot be null");
477                    }
478
479                    localConnectionInfo = new ConnectionInfo();
480                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
481                    localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
482                    localConnectionInfo.setClientId(localClientId);
483                    localConnectionInfo.setUserName(configuration.getUserName());
484                    localConnectionInfo.setPassword(configuration.getPassword());
485                    Transport originalTransport = remoteBroker;
486                    while (originalTransport instanceof TransportFilter) {
487                        originalTransport = ((TransportFilter) originalTransport).getNext();
488                    }
489                    if (originalTransport instanceof SslTransport) {
490                        X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
491                        localConnectionInfo.setTransportContext(peerCerts);
492                    }
493                    // sync requests that may fail
494                    Object resp = localBroker.request(localConnectionInfo);
495                    if (resp instanceof ExceptionResponse) {
496                        throw ((ExceptionResponse) resp).getException();
497                    }
498                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
499                    localBroker.oneway(localSessionInfo);
500
501                    if (configuration.isDuplex()) {
502                        // separate in-bound channel for forwards so we don't
503                        // contend with out-bound dispatch on same connection
504                        remoteBrokerInfo.setNetworkConnection(true);
505                        duplexInboundLocalBroker.oneway(remoteBrokerInfo);
506
507                        ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
508                        duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
509                        duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
510                                + configuration.getBrokerName());
511                        duplexLocalConnectionInfo.setUserName(configuration.getUserName());
512                        duplexLocalConnectionInfo.setPassword(configuration.getPassword());
513
514                        if (originalTransport instanceof SslTransport) {
515                            X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
516                            duplexLocalConnectionInfo.setTransportContext(peerCerts);
517                        }
518                        // sync requests that may fail
519                        resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
520                        if (resp instanceof ExceptionResponse) {
521                            throw ((ExceptionResponse) resp).getException();
522                        }
523                        SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
524                        duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
525                        duplexInboundLocalBroker.oneway(duplexInboundSession);
526                        duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
527                    }
528                    brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
529                    NetworkBridgeListener l = this.networkBridgeListener;
530                    if (l != null) {
531                        l.onStart(this);
532                    }
533
534                    // Let the local broker know the remote broker's ID.
535                    localBroker.oneway(remoteBrokerInfo);
536                    // new peer broker (a consumer can work with remote broker also)
537                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
538
539                    LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{
540                            localBroker, remoteBroker, remoteBrokerName
541                    });
542                    LOG.trace("{} register bridge ({}) to {}", new Object[]{
543                            configuration.getBrokerName(), this, remoteBrokerName
544                    });
545                } else {
546                    LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
547                }
548                startedLatch.countDown();
549                localStartedLatch.countDown();
550            }
551        }
552    }
553
554    protected void startRemoteBridge() throws Exception {
555        if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) {
556            LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker);
557            synchronized (this) {
558                if (!isCreatedByDuplex()) {
559                    BrokerInfo brokerInfo = new BrokerInfo();
560                    brokerInfo.setBrokerName(configuration.getBrokerName());
561                    brokerInfo.setBrokerURL(configuration.getBrokerURL());
562                    brokerInfo.setNetworkConnection(true);
563                    brokerInfo.setDuplexConnection(configuration.isDuplex());
564                    // set our properties
565                    Properties props = new Properties();
566                    IntrospectionSupport.getProperties(configuration, props, null);
567
568                    String dynamicallyIncludedDestinationsKey = "dynamicallyIncludedDestinations";
569                    String staticallyIncludedDestinationsKey = "staticallyIncludedDestinations";
570
571                    if (!configuration.getDynamicallyIncludedDestinations().isEmpty()) {
572                        props.put(dynamicallyIncludedDestinationsKey,
573                                StringToListOfActiveMQDestinationConverter.
574                                convertFromActiveMQDestination(configuration.getDynamicallyIncludedDestinations(), true));
575                    }
576                    if (!configuration.getStaticallyIncludedDestinations().isEmpty()) {
577                        props.put(staticallyIncludedDestinationsKey,
578                                StringToListOfActiveMQDestinationConverter.
579                                convertFromActiveMQDestination(configuration.getStaticallyIncludedDestinations(), true));
580                    }
581
582                    props.remove("networkTTL");
583                    String str = MarshallingSupport.propertiesToString(props);
584                    brokerInfo.setNetworkProperties(str);
585                    brokerInfo.setBrokerId(this.localBrokerId);
586                    remoteBroker.oneway(brokerInfo);
587                    if (configuration.isSyncDurableSubs() &&
588                            remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
589                        remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
590                                configuration));
591                    }
592                }
593                if (remoteConnectionInfo != null) {
594                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
595                }
596                remoteConnectionInfo = new ConnectionInfo();
597                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
598                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
599                remoteConnectionInfo.setUserName(configuration.getUserName());
600                remoteConnectionInfo.setPassword(configuration.getPassword());
601                remoteBroker.oneway(remoteConnectionInfo);
602
603                SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
604                remoteBroker.oneway(remoteSessionInfo);
605                producerInfo = new ProducerInfo(remoteSessionInfo, 1);
606                producerInfo.setResponseRequired(false);
607                remoteBroker.oneway(producerInfo);
608                // Listen to consumer advisory messages on the remote broker to determine demand.
609                if (!configuration.isStaticBridge()) {
610                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
611                    // always dispatch advisory message asynchronously so that
612                    // we never block the producer broker if we are slow
613                    demandConsumerInfo.setDispatchAsync(true);
614                    String advisoryTopic = configuration.getDestinationFilter();
615                    if (configuration.isBridgeTempDestinations()) {
616                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
617                    }
618                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
619                    configureConsumerPrefetch(demandConsumerInfo);
620                    remoteBroker.oneway(demandConsumerInfo);
621                }
622                startedLatch.countDown();
623            }
624        }
625    }
626
627    @Override
628    public void serviceRemoteException(Throwable error) {
629        if (!disposed.get()) {
630            if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
631                LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
632                        localBroker, remoteBroker, error
633                });
634            } else {
635                LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
636                        localBroker, remoteBroker, error
637                });
638            }
639            LOG.debug("The remote Exception was: {}", error, error);
640            brokerService.getTaskRunnerFactory().execute(new Runnable() {
641                @Override
642                public void run() {
643                    ServiceSupport.dispose(getControllingService());
644                }
645            });
646            fireBridgeFailed(error);
647        }
648    }
649
650    protected void serviceRemoteCommand(Command command) {
651        if (!disposed.get()) {
652            try {
653                if (command.isMessageDispatch()) {
654                    safeWaitUntilStarted();
655                    MessageDispatch md = (MessageDispatch) command;
656                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
657                    ackAdvisory(md.getMessage());
658                } else if (command.isBrokerInfo()) {
659                    futureRemoteBrokerInfo.set((BrokerInfo) command);
660                } else if (command instanceof BrokerSubscriptionInfo) {
661                    final BrokerSubscriptionInfo brokerSubscriptionInfo = (BrokerSubscriptionInfo) command;
662
663                    //Start in a new thread so we don't block the transport waiting for staticDestinations
664                    syncExecutor.execute(new Runnable() {
665
666                        @Override
667                        public void run() {
668                            try {
669                                staticDestinationsLatch.await();
670                                //Make sure after the countDown of staticDestinationsLatch we aren't stopping
671                                if (!disposed.get()) {
672                                    BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo;
673                                    LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
674                                            brokerService.getBrokerName(), subInfo.getBrokerName());
675
676                                    if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
677                                            && !configuration.isDynamicOnly()) {
678                                        if (started.get()) {
679                                            if (subInfo.getSubscriptionInfos() != null) {
680                                                for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
681                                                    //re-add any process any non-NC consumers that match the
682                                                    //dynamicallyIncludedDestinations list
683                                                    if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
684                                                            NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
685                                                        serviceRemoteConsumerAdvisory(info);
686                                                    }
687                                                }
688                                            }
689
690                                            //After re-added, clean up any empty durables
691                                            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
692                                                DemandSubscription ds = i.next();
693                                                if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
694                                                    cleanupDurableSub(ds, i);
695                                                }
696                                            }
697                                        }
698                                    }
699                                }
700                            } catch (Exception e) {
701                                LOG.warn("Error processing BrokerSubscriptionInfo: {}", e.getMessage(), e);
702                                LOG.debug(e.getMessage(), e);
703                            }
704                        }
705                    });
706
707                } else if (command.getClass() == ConnectionError.class) {
708                    ConnectionError ce = (ConnectionError) command;
709                    serviceRemoteException(ce.getException());
710                } else {
711                    if (isDuplex()) {
712                        LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
713                        if (command.isMessage()) {
714                            final ActiveMQMessage message = (ActiveMQMessage) command;
715                            if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
716                                serviceRemoteConsumerAdvisory(message.getDataStructure());
717                                ackAdvisory(message);
718                            } else {
719                                if (!isPermissableDestination(message.getDestination(), true)) {
720                                    return;
721                                }
722                                // message being forwarded - we need to
723                                // propagate the response to our local send
724                                if (canDuplexDispatch(message)) {
725                                    message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
726                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
727                                        duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
728                                            final int correlationId = message.getCommandId();
729
730                                            @Override
731                                            public void onCompletion(FutureResponse resp) {
732                                                try {
733                                                    Response reply = resp.getResult();
734                                                    reply.setCorrelationId(correlationId);
735                                                    remoteBroker.oneway(reply);
736                                                    //increment counter when messages are received in duplex mode
737                                                    networkBridgeStatistics.getReceivedCount().increment();
738                                                } catch (IOException error) {
739                                                    LOG.error("Exception: {} on duplex forward of: {}", error, message);
740                                                    serviceRemoteException(error);
741                                                }
742                                            }
743                                        });
744                                    } else {
745                                        duplexInboundLocalBroker.oneway(message);
746                                        networkBridgeStatistics.getReceivedCount().increment();
747                                    }
748                                    serviceInboundMessage(message);
749                                } else {
750                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
751                                        Response reply = new Response();
752                                        reply.setCorrelationId(message.getCommandId());
753                                        remoteBroker.oneway(reply);
754                                    }
755                                }
756                            }
757                        } else {
758                            switch (command.getDataStructureType()) {
759                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
760                                    if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
761                                        // end of initiating connection setup - propogate to initial connection to get mbean by clientid
762                                        duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
763                                    } else {
764                                        localBroker.oneway(command);
765                                    }
766                                    break;
767                                case SessionInfo.DATA_STRUCTURE_TYPE:
768                                    localBroker.oneway(command);
769                                    break;
770                                case ProducerInfo.DATA_STRUCTURE_TYPE:
771                                    // using duplexInboundLocalProducerInfo
772                                    break;
773                                case MessageAck.DATA_STRUCTURE_TYPE:
774                                    MessageAck ack = (MessageAck) command;
775                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
776                                    if (localSub != null) {
777                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
778                                        localBroker.oneway(ack);
779                                    } else {
780                                        LOG.warn("Matching local subscription not found for ack: {}", ack);
781                                    }
782                                    break;
783                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
784                                    localStartedLatch.await();
785                                    if (started.get()) {
786                                        addConsumerInfo((ConsumerInfo) command);
787                                    } else {
788                                        // received a subscription whilst stopping
789                                        LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
790                                    }
791                                    break;
792                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
793                                    // initiator is shutting down, controlled case
794                                    // abortive close dealt with by inactivity monitor
795                                    LOG.info("Stopping network bridge on shutdown of remote broker");
796                                    serviceRemoteException(new IOException(command.toString()));
797                                    break;
798                                default:
799                                    LOG.debug("Ignoring remote command: {}", command);
800                            }
801                        }
802                    } else {
803                        switch (command.getDataStructureType()) {
804                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
805                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
806                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
807                                break;
808                            default:
809                                LOG.warn("Unexpected remote command: {}", command);
810                        }
811                    }
812                }
813            } catch (Throwable e) {
814                LOG.debug("Exception processing remote command: {}", command, e);
815                serviceRemoteException(e);
816            }
817        }
818    }
819
820    private void ackAdvisory(Message message) throws IOException {
821        demandConsumerDispatched++;
822        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() *
823                (configuration.getAdvisoryAckPercentage() / 100f))) {
824            MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
825            ack.setConsumerId(demandConsumerInfo.getConsumerId());
826            remoteBroker.oneway(ack);
827            demandConsumerDispatched = 0;
828        }
829    }
830
831    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
832        final int networkTTL = configuration.getConsumerTTL();
833        if (data.getClass() == ConsumerInfo.class) {
834            // Create a new local subscription
835            ConsumerInfo info = (ConsumerInfo) data;
836            BrokerId[] path = info.getBrokerPath();
837
838            if (info.isBrowser()) {
839                LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
840                return;
841            }
842
843            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
844                LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
845                        configuration.getBrokerName(), remoteBrokerName, networkTTL, info
846                });
847                return;
848            }
849
850            if (contains(path, localBrokerPath[0])) {
851                // Ignore this consumer as it's a consumer we locally sent to the broker.
852                LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
853                        configuration.getBrokerName(), remoteBrokerName, info
854                });
855                return;
856            }
857
858            if (!isPermissableDestination(info.getDestination())) {
859                // ignore if not in the permitted or in the excluded list
860                LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
861                        configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
862                });
863                return;
864            }
865
866            // in a cyclic network there can be multiple bridges per broker that can propagate
867            // a network subscription so there is a need to synchronize on a shared entity
868            synchronized (brokerService.getVmConnectorURI()) {
869                addConsumerInfo(info);
870            }
871        } else if (data.getClass() == DestinationInfo.class) {
872            // It's a destination info - we want to pass up information about temporary destinations
873            final DestinationInfo destInfo = (DestinationInfo) data;
874            BrokerId[] path = destInfo.getBrokerPath();
875            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
876                LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
877                        configuration.getBrokerName(), destInfo, networkTTL
878                });
879                return;
880            }
881            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
882                LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
883                return;
884            }
885            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
886            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
887                // re-set connection id so comes from here
888                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
889                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
890            }
891            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
892            LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
893                    configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
894            });
895            if (destInfo.isRemoveOperation()) {
896                // Serialize with removeSub operations such that all removeSub advisories
897                // are generated
898                serialExecutor.execute(new Runnable() {
899                    @Override
900                    public void run() {
901                        try {
902                            localBroker.oneway(destInfo);
903                        } catch (IOException e) {
904                            LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
905                        }
906                    }
907                });
908            } else {
909                localBroker.oneway(destInfo);
910            }
911        } else if (data.getClass() == RemoveInfo.class) {
912            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
913            removeDemandSubscription(id);
914
915            if (forcedDurableRemoteId.remove(id)) {
916                for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
917                    DemandSubscription ds = i.next();
918                    boolean removed = ds.removeForcedDurableConsumer(id);
919                    if (removed) {
920                        cleanupDurableSub(ds, i);
921                    }
922                }
923           }
924
925        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
926            RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
927            SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
928            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
929                DemandSubscription ds = i.next();
930                boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
931                if (removed) {
932                    cleanupDurableSub(ds, i);
933                }
934            }
935        }
936    }
937
938    private void cleanupDurableSub(final DemandSubscription ds,
939            Iterator<DemandSubscription> i) throws IOException {
940        if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
941                && ds.getForcedDurableConsumersSize() == 0) {
942            // deactivate subscriber
943            RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
944            localBroker.oneway(removeInfo);
945
946            // remove subscriber
947            RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
948            sending.setClientId(localClientId);
949            sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
950            sending.setConnectionId(this.localConnectionInfo.getConnectionId());
951            localBroker.oneway(sending);
952
953            //remove subscriber from map
954            i.remove();
955        }
956    }
957
958    @Override
959    public void serviceLocalException(Throwable error) {
960        serviceLocalException(null, error);
961    }
962
963    public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
964        LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
965        if (!disposed.get()) {
966            if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
967                // not a reason to terminate the bridge - temps can disappear with
968                // pending sends as the demand sub may outlive the remote dest
969                if (messageDispatch != null) {
970                    LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
971                    try {
972                        MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
973                        poisonAck.setPoisonCause(error);
974                        localBroker.oneway(poisonAck);
975                    } catch (IOException ioe) {
976                        LOG.error("Failed to posion ack message following forward failure: ", ioe);
977                    }
978                    fireFailedForwardAdvisory(messageDispatch, error);
979                } else {
980                    LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error);
981                }
982                return;
983            }
984
985            LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
986            LOG.debug("The local Exception was: {}", error, error);
987
988            brokerService.getTaskRunnerFactory().execute(new Runnable() {
989                @Override
990                public void run() {
991                    ServiceSupport.dispose(getControllingService());
992                }
993            });
994            fireBridgeFailed(error);
995        }
996    }
997
998    private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
999        if (configuration.isAdvisoryForFailedForward()) {
1000            AdvisoryBroker advisoryBroker = null;
1001            try {
1002                advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
1003
1004                if (advisoryBroker != null) {
1005                    ConnectionContext context = new ConnectionContext();
1006                    context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
1007                    context.setBroker(brokerService.getBroker());
1008
1009                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
1010                    advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
1011                    advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
1012                            advisoryMessage);
1013
1014                }
1015            } catch (Exception e) {
1016                LOG.warn("failed to fire forward failure advisory, cause: {}", e);
1017                LOG.debug("detail", e);
1018            }
1019        }
1020    }
1021
1022    protected Service getControllingService() {
1023        return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
1024    }
1025
1026    protected void addSubscription(DemandSubscription sub) throws IOException {
1027        if (sub != null) {
1028            if (isDuplex()) {
1029                // async vm transport, need to wait for completion
1030                localBroker.request(sub.getLocalInfo());
1031            } else {
1032                localBroker.oneway(sub.getLocalInfo());
1033            }
1034        }
1035    }
1036
1037    protected void removeSubscription(final DemandSubscription sub) throws IOException {
1038        if (sub != null) {
1039            LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
1040
1041            // ensure not available for conduit subs pending removal
1042            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1043            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1044
1045            // continue removal in separate thread to free up this thread for outstanding responses
1046            // Serialize with removeDestination operations so that removeSubs are serialized with
1047            // removeDestinations such that all removeSub advisories are generated
1048            serialExecutor.execute(new Runnable() {
1049                @Override
1050                public void run() {
1051                    sub.waitForCompletion();
1052                    try {
1053                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
1054                    } catch (IOException e) {
1055                        LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
1056                    }
1057                }
1058            });
1059        }
1060    }
1061
1062    protected Message configureMessage(MessageDispatch md) throws IOException {
1063        Message message = md.getMessage().copy();
1064        // Update the packet to show where it came from.
1065        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
1066        message.setProducerId(producerInfo.getProducerId());
1067        message.setDestination(md.getDestination());
1068        message.setMemoryUsage(null);
1069        if (message.getOriginalTransactionId() == null) {
1070            message.setOriginalTransactionId(message.getTransactionId());
1071        }
1072        message.setTransactionId(null);
1073        if (configuration.isUseCompression()) {
1074            message.compress();
1075        }
1076        return message;
1077    }
1078
1079    protected void serviceLocalCommand(Command command) {
1080        if (!disposed.get()) {
1081            try {
1082                if (command.isMessageDispatch()) {
1083                    safeWaitUntilStarted();
1084                    networkBridgeStatistics.getEnqueues().increment();
1085                    final MessageDispatch md = (MessageDispatch) command;
1086                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
1087                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
1088
1089                        if (suppressMessageDispatch(md, sub)) {
1090                            LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
1091                                    configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
1092                            });
1093                            // still ack as it may be durable
1094                            try {
1095                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1096                            } finally {
1097                                sub.decrementOutstandingResponses();
1098                            }
1099                            return;
1100                        }
1101
1102                        Message message = configureMessage(md);
1103                        LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
1104                                configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId())
1105                        });
1106                        if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
1107                            try {
1108                                // never request b/c they are eventually                     acked async
1109                                remoteBroker.oneway(message);
1110                            } finally {
1111                                sub.decrementOutstandingResponses();
1112                            }
1113                            return;
1114                        }
1115                        if (isPermissableDestination(md.getDestination())) {
1116                           if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
1117
1118                              // The message was not sent using async send, so we should only
1119                              // ack the local broker when we get confirmation that the remote
1120                              // broker has received the message.
1121                              remoteBroker.asyncRequest(message, new ResponseCallback() {
1122                                 @Override
1123                                 public void onCompletion(FutureResponse future) {
1124                                    try {
1125                                       Response response = future.getResult();
1126                                       if (response.isException()) {
1127                                          ExceptionResponse er = (ExceptionResponse) response;
1128                                          serviceLocalException(md, er.getException());
1129                                       } else {
1130                                          localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1131                                          networkBridgeStatistics.getDequeues().increment();
1132                                       }
1133                                    } catch (IOException e) {
1134                                       serviceLocalException(md, e);
1135                                    } finally {
1136                                       sub.decrementOutstandingResponses();
1137                                    }
1138                                 }
1139                              });
1140
1141                           } else {
1142                              // If the message was originally sent using async send, we will
1143                              // preserve that QOS by bridging it using an async send (small chance
1144                              // of message loss).
1145                              try {
1146                                 remoteBroker.oneway(message);
1147                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1148                                 networkBridgeStatistics.getDequeues().increment();
1149                              } finally {
1150                                 sub.decrementOutstandingResponses();
1151                              }
1152                           }
1153                           serviceOutbound(message);
1154                        }
1155                    } else {
1156                        LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
1157                    }
1158                } else if (command.isBrokerInfo()) {
1159                    futureLocalBrokerInfo.set((BrokerInfo) command);
1160                } else if (command.isShutdownInfo()) {
1161                    LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
1162                    stop();
1163                } else if (command.getClass() == ConnectionError.class) {
1164                    ConnectionError ce = (ConnectionError) command;
1165                    serviceLocalException(ce.getException());
1166                } else {
1167                    switch (command.getDataStructureType()) {
1168                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
1169                            break;
1170                        case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE:
1171                            break;
1172                        default:
1173                            LOG.warn("Unexpected local command: {}", command);
1174                    }
1175                }
1176            } catch (Throwable e) {
1177                LOG.warn("Caught an exception processing local command", e);
1178                serviceLocalException(e);
1179            }
1180        }
1181    }
1182
1183    private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
1184        boolean suppress = false;
1185        // for durable subs, suppression via filter leaves dangling acks so we
1186        // need to check here and allow the ack irrespective
1187        if (sub.getLocalInfo().isDurable()) {
1188            MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
1189            messageEvalContext.setMessageReference(md.getMessage());
1190            messageEvalContext.setDestination(md.getDestination());
1191            suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
1192            //AMQ-6465 - Need to decrement the reference count after checking matches() as
1193            //the call above will increment the reference count by 1
1194            messageEvalContext.getMessageReference().decrementReferenceCount();
1195        }
1196        return suppress;
1197    }
1198
1199    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
1200        if (brokerPath != null) {
1201            for (BrokerId id : brokerPath) {
1202                if (brokerId.equals(id)) {
1203                    return true;
1204                }
1205            }
1206        }
1207        return false;
1208    }
1209
1210    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
1211        if (brokerPath == null || brokerPath.length == 0) {
1212            return pathsToAppend;
1213        }
1214        BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
1215        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1216        System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
1217        return rc;
1218    }
1219
1220    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
1221        if (brokerPath == null || brokerPath.length == 0) {
1222            return new BrokerId[]{idToAppend};
1223        }
1224        BrokerId rc[] = new BrokerId[brokerPath.length + 1];
1225        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1226        rc[brokerPath.length] = idToAppend;
1227        return rc;
1228    }
1229
1230    protected boolean isPermissableDestination(ActiveMQDestination destination) {
1231        return isPermissableDestination(destination, false);
1232    }
1233
1234    protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
1235        // Are we not bridging temporary destinations?
1236        if (destination.isTemporary()) {
1237            if (allowTemporary) {
1238                return true;
1239            } else {
1240                return configuration.isBridgeTempDestinations();
1241            }
1242        }
1243
1244        ActiveMQDestination[] dests = excludedDestinations;
1245        if (dests != null && dests.length > 0) {
1246            for (ActiveMQDestination dest : dests) {
1247                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
1248                if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1249                    return false;
1250                }
1251            }
1252        }
1253
1254        dests = staticallyIncludedDestinations;
1255        if (dests != null && dests.length > 0) {
1256            for (ActiveMQDestination dest : dests) {
1257                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1258                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1259                    return true;
1260                }
1261            }
1262        }
1263
1264        dests = dynamicallyIncludedDestinations;
1265        if (dests != null && dests.length > 0) {
1266            for (ActiveMQDestination dest : dests) {
1267                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1268                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1269                    return true;
1270                }
1271            }
1272
1273            return false;
1274        }
1275
1276        return true;
1277    }
1278
1279    /**
1280     * Subscriptions for these destinations are always created
1281     */
1282    protected void setupStaticDestinations() {
1283        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1284        if (dests != null) {
1285            for (ActiveMQDestination dest : dests) {
1286                if (isPermissableDestination(dest)) {
1287                    DemandSubscription sub = createDemandSubscription(dest, null);
1288                    sub.setStaticallyIncluded(true);
1289                    try {
1290                        addSubscription(sub);
1291                    } catch (IOException e) {
1292                        LOG.error("Failed to add static destination {}", dest, e);
1293                    }
1294                    LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
1295                } else {
1296                    LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest);
1297                }
1298            }
1299        }
1300    }
1301
1302    protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1303        ConsumerInfo info = consumerInfo.copy();
1304        addRemoteBrokerToBrokerPath(info);
1305        DemandSubscription sub = createDemandSubscription(info);
1306        if (sub != null) {
1307            if (duplicateSuppressionIsRequired(sub)) {
1308                undoMapRegistration(sub);
1309            } else {
1310                if (consumerInfo.isDurable()) {
1311                    sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
1312                }
1313                addSubscription(sub);
1314                LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub);
1315            }
1316        }
1317    }
1318
1319    private void undoMapRegistration(DemandSubscription sub) {
1320        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1321        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1322    }
1323
1324    /*
1325     * check our existing subs networkConsumerIds against the list of network
1326     * ids in this subscription A match means a duplicate which we suppress for
1327     * topics and maybe for queues
1328     */
1329    private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1330        final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1331        boolean suppress = false;
1332
1333        if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic()
1334                && !configuration.isSuppressDuplicateTopicSubscriptions()) {
1335            return suppress;
1336        }
1337
1338        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1339        Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
1340        for (Subscription sub : currentSubs) {
1341            List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1342            if (!networkConsumers.isEmpty()) {
1343                if (matchFound(candidateConsumers, networkConsumers)) {
1344                    if (isInActiveDurableSub(sub)) {
1345                        suppress = false;
1346                    } else {
1347                        suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1348                    }
1349                    break;
1350                }
1351            }
1352        }
1353        return suppress;
1354    }
1355
1356    private boolean isInActiveDurableSub(Subscription sub) {
1357        return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
1358    }
1359
1360    private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1361        boolean suppress = false;
1362
1363        if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1364            LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{
1365                    configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds()
1366            });
1367            suppress = true;
1368        } else {
1369            // remove the existing lower priority duplicate and allow this candidate
1370            try {
1371                removeDuplicateSubscription(existingSub);
1372
1373                LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{
1374                        configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds()
1375                });
1376            } catch (IOException e) {
1377                LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e);
1378            }
1379        }
1380        return suppress;
1381    }
1382
1383    private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1384        for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1385            if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1386                break;
1387            }
1388        }
1389    }
1390
1391    private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1392        boolean found = false;
1393        for (ConsumerId aliasConsumer : networkConsumers) {
1394            if (candidateConsumers.contains(aliasConsumer)) {
1395                found = true;
1396                break;
1397            }
1398        }
1399        return found;
1400    }
1401
1402    protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1403        RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1404        Region region;
1405        Collection<Subscription> subs;
1406
1407        region = null;
1408        switch (dest.getDestinationType()) {
1409            case ActiveMQDestination.QUEUE_TYPE:
1410                region = region_broker.getQueueRegion();
1411                break;
1412            case ActiveMQDestination.TOPIC_TYPE:
1413                region = region_broker.getTopicRegion();
1414                break;
1415            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1416                region = region_broker.getTempQueueRegion();
1417                break;
1418            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1419                region = region_broker.getTempTopicRegion();
1420                break;
1421        }
1422
1423        if (region instanceof AbstractRegion) {
1424            subs = ((AbstractRegion) region).getSubscriptions().values();
1425        } else {
1426            subs = null;
1427        }
1428
1429        return subs;
1430    }
1431
1432    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1433        // add our original id to ourselves
1434        info.addNetworkConsumerId(info.getConsumerId());
1435        return doCreateDemandSubscription(info);
1436    }
1437
1438    protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1439        DemandSubscription result = new DemandSubscription(info);
1440        result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1441        if (info.getDestination().isTemporary()) {
1442            // reset the local connection Id
1443            ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1444            dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1445        }
1446
1447        if (configuration.isDecreaseNetworkConsumerPriority()) {
1448            byte priority = (byte) configuration.getConsumerPriorityBase();
1449            if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1450                // The longer the path to the consumer, the less it's consumer priority.
1451                priority -= info.getBrokerPath().length + 1;
1452            }
1453            result.getLocalInfo().setPriority(priority);
1454            LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
1455        }
1456        configureDemandSubscription(info, result);
1457        return result;
1458    }
1459
1460    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination, final String subscriptionName) {
1461        ConsumerInfo info = new ConsumerInfo();
1462        info.setNetworkSubscription(true);
1463        info.setDestination(destination);
1464
1465        if (subscriptionName != null) {
1466            info.setSubscriptionName(subscriptionName);
1467        }
1468
1469        // Indicate that this subscription is being made on behalf of the remote broker.
1470        info.setBrokerPath(new BrokerId[]{remoteBrokerId});
1471
1472        // the remote info held by the DemandSubscription holds the original
1473        // consumerId, the local info get's overwritten
1474        info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1475        DemandSubscription result = null;
1476        try {
1477            result = createDemandSubscription(info);
1478        } catch (IOException e) {
1479            LOG.error("Failed to create DemandSubscription ", e);
1480        }
1481        return result;
1482    }
1483
1484    protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1485        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ||
1486                AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
1487            sub.getLocalInfo().setDispatchAsync(true);
1488        } else {
1489            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1490        }
1491        configureConsumerPrefetch(sub.getLocalInfo());
1492        subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1493        subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1494
1495        sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1496        if (!info.isDurable()) {
1497            // This works for now since we use a VM connection to the local broker.
1498            // may need to change if we ever subscribe to a remote broker.
1499            sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1500        } else {
1501            sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
1502        }
1503    }
1504
1505    protected void removeDemandSubscription(ConsumerId id) throws IOException {
1506        DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1507        LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{
1508                configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub
1509        });
1510        if (sub != null) {
1511            removeSubscription(sub);
1512            LOG.debug("{} removed sub on {} from {}: {}", new Object[]{
1513                    configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo()
1514            });
1515        }
1516    }
1517
1518    protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1519        boolean removeDone = false;
1520        DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1521        if (sub != null) {
1522            try {
1523                removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1524                removeDone = true;
1525            } catch (IOException e) {
1526                LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e);
1527            }
1528        }
1529        return removeDone;
1530    }
1531
1532    /**
1533     * Performs a timed wait on the started latch and then checks for disposed
1534     * before performing another wait each time the the started wait times out.
1535     */
1536    protected boolean safeWaitUntilStarted() throws InterruptedException {
1537        while (!disposed.get()) {
1538            if (startedLatch.await(1, TimeUnit.SECONDS)) {
1539                break;
1540            }
1541        }
1542        return !disposed.get();
1543    }
1544
1545    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1546        NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1547        if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1548            PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1549            if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1550                filterFactory = entry.getNetworkBridgeFilterFactory();
1551            }
1552        }
1553        return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL());
1554    }
1555
1556    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1557        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1558    }
1559
1560    protected BrokerId[] getRemoteBrokerPath() {
1561        return remoteBrokerPath;
1562    }
1563
1564    @Override
1565    public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1566        this.networkBridgeListener = listener;
1567    }
1568
1569    private void fireBridgeFailed(Throwable reason) {
1570        LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason);
1571        NetworkBridgeListener l = this.networkBridgeListener;
1572        if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
1573            l.bridgeFailed();
1574        }
1575    }
1576
1577    /**
1578     * @return Returns the dynamicallyIncludedDestinations.
1579     */
1580    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
1581        return dynamicallyIncludedDestinations;
1582    }
1583
1584    /**
1585     * @param dynamicallyIncludedDestinations
1586     *         The dynamicallyIncludedDestinations to set.
1587     */
1588    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
1589        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
1590    }
1591
1592    /**
1593     * @return Returns the excludedDestinations.
1594     */
1595    public ActiveMQDestination[] getExcludedDestinations() {
1596        return excludedDestinations;
1597    }
1598
1599    /**
1600     * @param excludedDestinations The excludedDestinations to set.
1601     */
1602    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
1603        this.excludedDestinations = excludedDestinations;
1604    }
1605
1606    /**
1607     * @return Returns the staticallyIncludedDestinations.
1608     */
1609    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
1610        return staticallyIncludedDestinations;
1611    }
1612
1613    /**
1614     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
1615     */
1616    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
1617        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
1618    }
1619
1620    /**
1621     * @return Returns the durableDestinations.
1622     */
1623    public ActiveMQDestination[] getDurableDestinations() {
1624        return durableDestinations;
1625    }
1626
1627    /**
1628     * @param durableDestinations The durableDestinations to set.
1629     */
1630    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
1631        this.durableDestinations = durableDestinations;
1632    }
1633
1634    /**
1635     * @return Returns the localBroker.
1636     */
1637    public Transport getLocalBroker() {
1638        return localBroker;
1639    }
1640
1641    /**
1642     * @return Returns the remoteBroker.
1643     */
1644    public Transport getRemoteBroker() {
1645        return remoteBroker;
1646    }
1647
1648    /**
1649     * @return the createdByDuplex
1650     */
1651    public boolean isCreatedByDuplex() {
1652        return this.createdByDuplex;
1653    }
1654
1655    /**
1656     * @param createdByDuplex the createdByDuplex to set
1657     */
1658    public void setCreatedByDuplex(boolean createdByDuplex) {
1659        this.createdByDuplex = createdByDuplex;
1660    }
1661
1662    @Override
1663    public String getRemoteAddress() {
1664        return remoteBroker.getRemoteAddress();
1665    }
1666
1667    @Override
1668    public String getLocalAddress() {
1669        return localBroker.getRemoteAddress();
1670    }
1671
1672    @Override
1673    public String getRemoteBrokerName() {
1674        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1675    }
1676
1677    @Override
1678    public String getRemoteBrokerId() {
1679        return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
1680    }
1681
1682    @Override
1683    public String getLocalBrokerName() {
1684        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1685    }
1686
1687    @Override
1688    public long getDequeueCounter() {
1689        return networkBridgeStatistics.getDequeues().getCount();
1690    }
1691
1692    @Override
1693    public long getEnqueueCounter() {
1694        return networkBridgeStatistics.getEnqueues().getCount();
1695    }
1696
1697    @Override
1698    public NetworkBridgeStatistics getNetworkBridgeStatistics() {
1699        return networkBridgeStatistics;
1700    }
1701
1702    protected boolean isDuplex() {
1703        return configuration.isDuplex() || createdByDuplex;
1704    }
1705
1706    public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1707        return subscriptionMapByRemoteId;
1708    }
1709
1710    @Override
1711    public void setBrokerService(BrokerService brokerService) {
1712        this.brokerService = brokerService;
1713        this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1714        localBrokerPath[0] = localBrokerId;
1715    }
1716
1717    @Override
1718    public void setMbeanObjectName(ObjectName objectName) {
1719        this.mbeanObjectName = objectName;
1720    }
1721
1722    @Override
1723    public ObjectName getMbeanObjectName() {
1724        return mbeanObjectName;
1725    }
1726
1727    @Override
1728    public void resetStats() {
1729        networkBridgeStatistics.reset();
1730    }
1731
1732    /*
1733     * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
1734     * remote sides of the network bridge.
1735     */
1736    private static class FutureBrokerInfo implements Future<BrokerInfo> {
1737
1738        private final CountDownLatch slot = new CountDownLatch(1);
1739        private final AtomicBoolean disposed;
1740        private volatile BrokerInfo info = null;
1741
1742        public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
1743            this.info = info;
1744            this.disposed = disposed;
1745        }
1746
1747        @Override
1748        public boolean cancel(boolean mayInterruptIfRunning) {
1749            slot.countDown();
1750            return true;
1751        }
1752
1753        @Override
1754        public boolean isCancelled() {
1755            return slot.getCount() == 0 && info == null;
1756        }
1757
1758        @Override
1759        public boolean isDone() {
1760            return info != null;
1761        }
1762
1763        @Override
1764        public BrokerInfo get() throws InterruptedException, ExecutionException {
1765            try {
1766                if (info == null) {
1767                    while (!disposed.get()) {
1768                        if (slot.await(1, TimeUnit.SECONDS)) {
1769                            break;
1770                        }
1771                    }
1772                }
1773                return info;
1774            } catch (InterruptedException e) {
1775                Thread.currentThread().interrupt();
1776                LOG.debug("Operation interrupted: {}", e, e);
1777                throw new InterruptedException("Interrupted.");
1778            }
1779        }
1780
1781        @Override
1782        public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1783            try {
1784                if (info == null) {
1785                    long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
1786
1787                    while (!disposed.get() || System.currentTimeMillis() < deadline) {
1788                        if (slot.await(1, TimeUnit.MILLISECONDS)) {
1789                            break;
1790                        }
1791                    }
1792                    if (info == null) {
1793                        throw new TimeoutException();
1794                    }
1795                }
1796                return info;
1797            } catch (InterruptedException e) {
1798                throw new InterruptedException("Interrupted.");
1799            }
1800        }
1801
1802        public void set(BrokerInfo info) {
1803            this.info = info;
1804            this.slot.countDown();
1805        }
1806    }
1807
1808    protected void serviceOutbound(Message message) {
1809        NetworkBridgeListener l = this.networkBridgeListener;
1810        if (l != null) {
1811            l.onOutboundMessage(this, message);
1812        }
1813    }
1814
1815    protected void serviceInboundMessage(Message message) {
1816        NetworkBridgeListener l = this.networkBridgeListener;
1817        if (l != null) {
1818            l.onInboundMessage(this, message);
1819        }
1820    }
1821
1822    protected boolean canDuplexDispatch(Message message) {
1823        boolean result = true;
1824        if (configuration.isCheckDuplicateMessagesOnDuplex()){
1825            final long producerSequenceId = message.getMessageId().getProducerSequenceId();
1826            //  messages are multiplexed on this producer so we need to query the persistenceAdapter
1827            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
1828            if (producerSequenceId <= lastStoredForMessageProducer) {
1829                result = false;
1830                LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
1831                        (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
1832                });
1833            }
1834        }
1835        return result;
1836    }
1837
1838    protected long getStoredSequenceIdForMessage(MessageId messageId) {
1839        try {
1840            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
1841        } catch (IOException ignored) {
1842            LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
1843        }
1844        return -1;
1845    }
1846
1847    protected void configureConsumerPrefetch(ConsumerInfo consumerInfo) {
1848        //If a consumer on an advisory topic and advisoryPrefetchSize has been explicitly
1849        //set then use it, else default to the prefetchSize setting
1850        if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination()) &&
1851                configuration.getAdvisoryPrefetchSize() > 0) {
1852            consumerInfo.setPrefetchSize(configuration.getAdvisoryPrefetchSize());
1853        } else {
1854            consumerInfo.setPrefetchSize(configuration.getPrefetchSize());
1855        }
1856    }
1857
1858}