001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.security.GeneralSecurityException;
021import java.security.cert.X509Certificate;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Properties;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.Future;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import java.util.concurrent.atomic.AtomicBoolean;
039
040import javax.management.ObjectName;
041
042import org.apache.activemq.DestinationDoesNotExistException;
043import org.apache.activemq.Service;
044import org.apache.activemq.advisory.AdvisoryBroker;
045import org.apache.activemq.advisory.AdvisorySupport;
046import org.apache.activemq.broker.BrokerService;
047import org.apache.activemq.broker.BrokerServiceAware;
048import org.apache.activemq.broker.ConnectionContext;
049import org.apache.activemq.broker.TransportConnection;
050import org.apache.activemq.broker.region.AbstractRegion;
051import org.apache.activemq.broker.region.DurableTopicSubscription;
052import org.apache.activemq.broker.region.Region;
053import org.apache.activemq.broker.region.RegionBroker;
054import org.apache.activemq.broker.region.Subscription;
055import org.apache.activemq.broker.region.policy.PolicyEntry;
056import org.apache.activemq.command.ActiveMQDestination;
057import org.apache.activemq.command.ActiveMQMessage;
058import org.apache.activemq.command.ActiveMQTempDestination;
059import org.apache.activemq.command.ActiveMQTopic;
060import org.apache.activemq.command.BrokerId;
061import org.apache.activemq.command.BrokerInfo;
062import org.apache.activemq.command.BrokerSubscriptionInfo;
063import org.apache.activemq.command.Command;
064import org.apache.activemq.command.CommandTypes;
065import org.apache.activemq.command.ConnectionError;
066import org.apache.activemq.command.ConnectionId;
067import org.apache.activemq.command.ConnectionInfo;
068import org.apache.activemq.command.ConsumerId;
069import org.apache.activemq.command.ConsumerInfo;
070import org.apache.activemq.command.DataStructure;
071import org.apache.activemq.command.DestinationInfo;
072import org.apache.activemq.command.ExceptionResponse;
073import org.apache.activemq.command.KeepAliveInfo;
074import org.apache.activemq.command.Message;
075import org.apache.activemq.command.MessageAck;
076import org.apache.activemq.command.MessageDispatch;
077import org.apache.activemq.command.MessageId;
078import org.apache.activemq.command.NetworkBridgeFilter;
079import org.apache.activemq.command.ProducerInfo;
080import org.apache.activemq.command.RemoveInfo;
081import org.apache.activemq.command.RemoveSubscriptionInfo;
082import org.apache.activemq.command.Response;
083import org.apache.activemq.command.SessionInfo;
084import org.apache.activemq.command.ShutdownInfo;
085import org.apache.activemq.command.SubscriptionInfo;
086import org.apache.activemq.command.WireFormatInfo;
087import org.apache.activemq.filter.DestinationFilter;
088import org.apache.activemq.filter.MessageEvaluationContext;
089import org.apache.activemq.security.SecurityContext;
090import org.apache.activemq.transport.DefaultTransportListener;
091import org.apache.activemq.transport.FutureResponse;
092import org.apache.activemq.transport.ResponseCallback;
093import org.apache.activemq.transport.Transport;
094import org.apache.activemq.transport.TransportDisposedIOException;
095import org.apache.activemq.transport.TransportFilter;
096import org.apache.activemq.transport.tcp.SslTransport;
097import org.apache.activemq.transport.tcp.TcpTransport;
098import org.apache.activemq.util.IdGenerator;
099import org.apache.activemq.util.IntrospectionSupport;
100import org.apache.activemq.util.LongSequenceGenerator;
101import org.apache.activemq.util.MarshallingSupport;
102import org.apache.activemq.util.ServiceStopper;
103import org.apache.activemq.util.ServiceSupport;
104import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
105import org.slf4j.Logger;
106import org.slf4j.LoggerFactory;
107
108/**
109 * A useful base class for implementing demand forwarding bridges.
110 */
111public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
112    private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
113    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
114    protected final Transport localBroker;
115    protected final Transport remoteBroker;
116    protected IdGenerator idGenerator = new IdGenerator();
117    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
118    protected ConnectionInfo localConnectionInfo;
119    protected ConnectionInfo remoteConnectionInfo;
120    protected SessionInfo localSessionInfo;
121    protected ProducerInfo producerInfo;
122    protected String remoteBrokerName = "Unknown";
123    protected String localClientId;
124    protected ConsumerInfo demandConsumerInfo;
125    protected int demandConsumerDispatched;
126    protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
127    protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
128    protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
129    protected final AtomicBoolean disposed = new AtomicBoolean();
130    protected BrokerId localBrokerId;
131    protected ActiveMQDestination[] excludedDestinations;
132    protected ActiveMQDestination[] dynamicallyIncludedDestinations;
133    protected ActiveMQDestination[] staticallyIncludedDestinations;
134    protected ActiveMQDestination[] durableDestinations;
135    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>();
136    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>();
137    protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>());
138    protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
139    protected final CountDownLatch startedLatch = new CountDownLatch(2);
140    protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
141    protected final CountDownLatch staticDestinationsLatch = new CountDownLatch(1);
142    protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
143    protected NetworkBridgeConfiguration configuration;
144    protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
145
146    protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
147    protected BrokerId remoteBrokerId;
148
149    protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics();
150
151    private NetworkBridgeListener networkBridgeListener;
152    private boolean createdByDuplex;
153    private BrokerInfo localBrokerInfo;
154    private BrokerInfo remoteBrokerInfo;
155
156    private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
157    private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
158
159    private final AtomicBoolean started = new AtomicBoolean();
160    private TransportConnection duplexInitiatingConnection;
161    private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean();
162    protected BrokerService brokerService = null;
163    private ObjectName mbeanObjectName;
164    private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
165    private Transport duplexInboundLocalBroker = null;
166    private ProducerInfo duplexInboundLocalProducerInfo;
167
168    public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
169        this.configuration = configuration;
170        this.localBroker = localBroker;
171        this.remoteBroker = remoteBroker;
172    }
173
174    public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
175        this.localBrokerInfo = localBrokerInfo;
176        this.remoteBrokerInfo = remoteBrokerInfo;
177        this.duplexInitiatingConnection = connection;
178        start();
179        serviceRemoteCommand(remoteBrokerInfo);
180    }
181
182    @Override
183    public void start() throws Exception {
184        if (started.compareAndSet(false, true)) {
185
186            if (brokerService == null) {
187                throw new IllegalArgumentException("BrokerService is null on " + this);
188            }
189
190            networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
191
192            if (isDuplex()) {
193                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
194                duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
195
196                    @Override
197                    public void onCommand(Object o) {
198                        Command command = (Command) o;
199                        serviceLocalCommand(command);
200                    }
201
202                    @Override
203                    public void onException(IOException error) {
204                        serviceLocalException(error);
205                    }
206                });
207                duplexInboundLocalBroker.start();
208            }
209
210            localBroker.setTransportListener(new DefaultTransportListener() {
211
212                @Override
213                public void onCommand(Object o) {
214                    Command command = (Command) o;
215                    serviceLocalCommand(command);
216                }
217
218                @Override
219                public void onException(IOException error) {
220                    if (!futureLocalBrokerInfo.isDone()) {
221                        futureLocalBrokerInfo.cancel(true);
222                        return;
223                    }
224                    serviceLocalException(error);
225                }
226            });
227
228            remoteBroker.setTransportListener(new DefaultTransportListener() {
229
230                @Override
231                public void onCommand(Object o) {
232                    Command command = (Command) o;
233                    serviceRemoteCommand(command);
234                }
235
236                @Override
237                public void onException(IOException error) {
238                    if (!futureRemoteBrokerInfo.isDone()) {
239                        futureRemoteBrokerInfo.cancel(true);
240                        return;
241                    }
242                    serviceRemoteException(error);
243                }
244            });
245
246            remoteBroker.start();
247            localBroker.start();
248
249            if (!disposed.get()) {
250                try {
251                    triggerStartAsyncNetworkBridgeCreation();
252                } catch (IOException e) {
253                    LOG.warn("Caught exception from remote start", e);
254                }
255            } else {
256                LOG.warn("Bridge was disposed before the start() method was fully executed.");
257                throw new TransportDisposedIOException();
258            }
259        }
260    }
261
262    @Override
263    public void stop() throws Exception {
264        if (started.compareAndSet(true, false)) {
265            if (disposed.compareAndSet(false, true)) {
266                LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName);
267
268                futureRemoteBrokerInfo.cancel(true);
269                futureLocalBrokerInfo.cancel(true);
270
271                NetworkBridgeListener l = this.networkBridgeListener;
272                if (l != null) {
273                    l.onStop(this);
274                }
275                try {
276                    // local start complete
277                    if (startedLatch.getCount() < 2) {
278                        LOG.trace("{} unregister bridge ({}) to {}", new Object[]{
279                                configuration.getBrokerName(), this, remoteBrokerName
280                        });
281                        brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
282                        brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
283                    }
284
285                    remoteBridgeStarted.set(false);
286                    final CountDownLatch sendShutdown = new CountDownLatch(1);
287
288                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
289                        @Override
290                        public void run() {
291                            try {
292                                serialExecutor.shutdown();
293                                if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
294                                    List<Runnable> pendingTasks = serialExecutor.shutdownNow();
295                                    LOG.info("pending tasks on stop {}", pendingTasks);
296                                }
297                                localBroker.oneway(new ShutdownInfo());
298                                remoteBroker.oneway(new ShutdownInfo());
299                            } catch (Throwable e) {
300                                LOG.debug("Caught exception sending shutdown", e);
301                            } finally {
302                                sendShutdown.countDown();
303                            }
304
305                        }
306                    }, "ActiveMQ ForwardingBridge StopTask");
307
308                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
309                        LOG.info("Network Could not shutdown in a timely manner");
310                    }
311                } finally {
312                    ServiceStopper ss = new ServiceStopper();
313                    ss.stop(remoteBroker);
314                    ss.stop(localBroker);
315                    ss.stop(duplexInboundLocalBroker);
316                    // Release the started Latch since another thread could be
317                    // stuck waiting for it to start up.
318                    startedLatch.countDown();
319                    startedLatch.countDown();
320                    localStartedLatch.countDown();
321                    staticDestinationsLatch.countDown();
322
323                    ss.throwFirstException();
324                }
325            }
326
327            LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName);
328        }
329    }
330
331    protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
332        brokerService.getTaskRunnerFactory().execute(new Runnable() {
333            @Override
334            public void run() {
335                final String originalName = Thread.currentThread().getName();
336                Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
337                        "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
338
339                try {
340                    // First we collect the info data from both the local and remote ends
341                    collectBrokerInfos();
342
343                    // Once we have all required broker info we can attempt to start
344                    // the local and then remote sides of the bridge.
345                    doStartLocalAndRemoteBridges();
346                } finally {
347                    Thread.currentThread().setName(originalName);
348                }
349            }
350        });
351    }
352
353    private void collectBrokerInfos() {
354        int timeout = 30000;
355        TcpTransport tcpTransport = remoteBroker.narrow(TcpTransport.class);
356        if (tcpTransport != null) {
357           timeout = tcpTransport.getConnectionTimeout();
358        }
359
360        // First wait for the remote to feed us its BrokerInfo, then we can check on
361        // the LocalBrokerInfo and decide is this is a loop.
362        try {
363            remoteBrokerInfo = futureRemoteBrokerInfo.get(timeout, TimeUnit.MILLISECONDS);
364            if (remoteBrokerInfo == null) {
365                serviceLocalException(new Throwable("remoteBrokerInfo is null"));
366                return;
367            }
368        } catch (Exception e) {
369            serviceRemoteException(e);
370            return;
371        }
372
373        try {
374            localBrokerInfo = futureLocalBrokerInfo.get(timeout, TimeUnit.MILLISECONDS);
375            if (localBrokerInfo == null) {
376                serviceLocalException(new Throwable("localBrokerInfo is null"));
377                return;
378            }
379
380            // Before we try and build the bridge lets check if we are in a loop
381            // and if so just stop now before registering anything.
382            remoteBrokerId = remoteBrokerInfo.getBrokerId();
383            if (localBrokerId.equals(remoteBrokerId)) {
384                LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
385                        configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
386                });
387                ServiceSupport.dispose(localBroker);
388                ServiceSupport.dispose(remoteBroker);
389                // the bridge is left in a bit of limbo, but it won't get retried
390                // in this state.
391                return;
392            }
393
394            // Fill in the remote broker's information now.
395            remoteBrokerPath[0] = remoteBrokerId;
396            remoteBrokerName = remoteBrokerInfo.getBrokerName();
397            if (configuration.isUseBrokerNamesAsIdSeed()) {
398                idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName);
399            }
400        } catch (Throwable e) {
401            serviceLocalException(e);
402        }
403    }
404
405    private void doStartLocalAndRemoteBridges() {
406
407        if (disposed.get()) {
408            return;
409        }
410
411        if (isCreatedByDuplex()) {
412            // apply remote (propagated) configuration to local duplex bridge before start
413            Properties props = null;
414            try {
415                props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
416                IntrospectionSupport.getProperties(configuration, props, null);
417                if (configuration.getExcludedDestinations() != null) {
418                    excludedDestinations = configuration.getExcludedDestinations().toArray(
419                            new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
420                }
421                if (configuration.getStaticallyIncludedDestinations() != null) {
422                    staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
423                            new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
424                }
425                if (configuration.getDynamicallyIncludedDestinations() != null) {
426                    dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
427                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
428                }
429            } catch (Throwable t) {
430                LOG.error("Error mapping remote configuration: {}", props, t);
431            }
432        }
433
434        try {
435            startLocalBridge();
436        } catch (Throwable e) {
437            serviceLocalException(e);
438            return;
439        }
440
441        try {
442            startRemoteBridge();
443        } catch (Throwable e) {
444            serviceRemoteException(e);
445            return;
446        }
447
448        try {
449            if (safeWaitUntilStarted()) {
450                setupStaticDestinations();
451                staticDestinationsLatch.countDown();
452            }
453        } catch (Throwable e) {
454            serviceLocalException(e);
455        }
456    }
457
458    private void startLocalBridge() throws Throwable {
459        if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) {
460            synchronized (this) {
461                LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker);
462                if (!disposed.get()) {
463
464                    if (idGenerator == null) {
465                        throw new IllegalStateException("Id Generator cannot be null");
466                    }
467
468                    localConnectionInfo = new ConnectionInfo();
469                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
470                    localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
471                    localConnectionInfo.setClientId(localClientId);
472                    localConnectionInfo.setUserName(configuration.getUserName());
473                    localConnectionInfo.setPassword(configuration.getPassword());
474                    Transport originalTransport = remoteBroker;
475                    while (originalTransport instanceof TransportFilter) {
476                        originalTransport = ((TransportFilter) originalTransport).getNext();
477                    }
478                    if (originalTransport instanceof SslTransport) {
479                        X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
480                        localConnectionInfo.setTransportContext(peerCerts);
481                    }
482                    // sync requests that may fail
483                    Object resp = localBroker.request(localConnectionInfo);
484                    if (resp instanceof ExceptionResponse) {
485                        throw ((ExceptionResponse) resp).getException();
486                    }
487                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
488                    localBroker.oneway(localSessionInfo);
489
490                    if (configuration.isDuplex()) {
491                        // separate in-bound channel for forwards so we don't
492                        // contend with out-bound dispatch on same connection
493                        remoteBrokerInfo.setNetworkConnection(true);
494                        duplexInboundLocalBroker.oneway(remoteBrokerInfo);
495
496                        ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
497                        duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
498                        duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
499                                + configuration.getBrokerName());
500                        duplexLocalConnectionInfo.setUserName(configuration.getUserName());
501                        duplexLocalConnectionInfo.setPassword(configuration.getPassword());
502
503                        if (originalTransport instanceof SslTransport) {
504                            X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
505                            duplexLocalConnectionInfo.setTransportContext(peerCerts);
506                        }
507                        // sync requests that may fail
508                        resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
509                        if (resp instanceof ExceptionResponse) {
510                            throw ((ExceptionResponse) resp).getException();
511                        }
512                        SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
513                        duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
514                        duplexInboundLocalBroker.oneway(duplexInboundSession);
515                        duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
516                    }
517                    brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
518                    NetworkBridgeListener l = this.networkBridgeListener;
519                    if (l != null) {
520                        l.onStart(this);
521                    }
522
523                    // Let the local broker know the remote broker's ID.
524                    localBroker.oneway(remoteBrokerInfo);
525                    // new peer broker (a consumer can work with remote broker also)
526                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
527
528                    LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{
529                            localBroker, remoteBroker, remoteBrokerName
530                    });
531                    LOG.trace("{} register bridge ({}) to {}", new Object[]{
532                            configuration.getBrokerName(), this, remoteBrokerName
533                    });
534                } else {
535                    LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
536                }
537                startedLatch.countDown();
538                localStartedLatch.countDown();
539            }
540        }
541    }
542
543    protected void startRemoteBridge() throws Exception {
544        if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) {
545            LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker);
546            synchronized (this) {
547                if (!isCreatedByDuplex()) {
548                    BrokerInfo brokerInfo = new BrokerInfo();
549                    brokerInfo.setBrokerName(configuration.getBrokerName());
550                    brokerInfo.setBrokerURL(configuration.getBrokerURL());
551                    brokerInfo.setNetworkConnection(true);
552                    brokerInfo.setDuplexConnection(configuration.isDuplex());
553                    // set our properties
554                    Properties props = new Properties();
555                    IntrospectionSupport.getProperties(configuration, props, null);
556
557                    String dynamicallyIncludedDestinationsKey = "dynamicallyIncludedDestinations";
558                    String staticallyIncludedDestinationsKey = "staticallyIncludedDestinations";
559
560                    if (!configuration.getDynamicallyIncludedDestinations().isEmpty()) {
561                        props.put(dynamicallyIncludedDestinationsKey,
562                                StringToListOfActiveMQDestinationConverter.
563                                convertFromActiveMQDestination(configuration.getDynamicallyIncludedDestinations(), true));
564                    }
565                    if (!configuration.getStaticallyIncludedDestinations().isEmpty()) {
566                        props.put(staticallyIncludedDestinationsKey,
567                                StringToListOfActiveMQDestinationConverter.
568                                convertFromActiveMQDestination(configuration.getStaticallyIncludedDestinations(), true));
569                    }
570
571                    props.remove("networkTTL");
572                    String str = MarshallingSupport.propertiesToString(props);
573                    brokerInfo.setNetworkProperties(str);
574                    brokerInfo.setBrokerId(this.localBrokerId);
575                    remoteBroker.oneway(brokerInfo);
576                    if (configuration.isSyncDurableSubs() &&
577                            remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
578                        remoteBroker.oneway(TransportConnection.getBrokerSubscriptionInfo(brokerService));
579                    }
580                }
581                if (remoteConnectionInfo != null) {
582                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
583                }
584                remoteConnectionInfo = new ConnectionInfo();
585                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
586                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
587                remoteConnectionInfo.setUserName(configuration.getUserName());
588                remoteConnectionInfo.setPassword(configuration.getPassword());
589                remoteBroker.oneway(remoteConnectionInfo);
590
591                SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
592                remoteBroker.oneway(remoteSessionInfo);
593                producerInfo = new ProducerInfo(remoteSessionInfo, 1);
594                producerInfo.setResponseRequired(false);
595                remoteBroker.oneway(producerInfo);
596                // Listen to consumer advisory messages on the remote broker to determine demand.
597                if (!configuration.isStaticBridge()) {
598                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
599                    // always dispatch advisory message asynchronously so that
600                    // we never block the producer broker if we are slow
601                    demandConsumerInfo.setDispatchAsync(true);
602                    String advisoryTopic = configuration.getDestinationFilter();
603                    if (configuration.isBridgeTempDestinations()) {
604                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
605                    }
606                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
607                    configureConsumerPrefetch(demandConsumerInfo);
608                    remoteBroker.oneway(demandConsumerInfo);
609                }
610                startedLatch.countDown();
611            }
612        }
613    }
614
615    @Override
616    public void serviceRemoteException(Throwable error) {
617        if (!disposed.get()) {
618            if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
619                LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
620                        localBroker, remoteBroker, error
621                });
622            } else {
623                LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
624                        localBroker, remoteBroker, error
625                });
626            }
627            LOG.debug("The remote Exception was: {}", error, error);
628            brokerService.getTaskRunnerFactory().execute(new Runnable() {
629                @Override
630                public void run() {
631                    ServiceSupport.dispose(getControllingService());
632                }
633            });
634            fireBridgeFailed(error);
635        }
636    }
637
638    protected void serviceRemoteCommand(Command command) {
639        if (!disposed.get()) {
640            try {
641                if (command.isMessageDispatch()) {
642                    safeWaitUntilStarted();
643                    MessageDispatch md = (MessageDispatch) command;
644                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
645                    ackAdvisory(md.getMessage());
646                } else if (command.isBrokerInfo()) {
647                    futureRemoteBrokerInfo.set((BrokerInfo) command);
648                } else if (command instanceof BrokerSubscriptionInfo) {
649                    staticDestinationsLatch.await();
650                    BrokerSubscriptionInfo subInfo = (BrokerSubscriptionInfo) command;
651                    LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
652                            this.brokerService.getBrokerName(), subInfo.getBrokerName());
653
654                    if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
655                            && !configuration.isDynamicOnly() && subInfo.getSubscriptionInfos() != null) {
656                        if (started.get()) {
657                            for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
658                                if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
659                                        matchesDynamicallyIncludedDestinations(info.getDestination())) {
660                                    serviceRemoteConsumerAdvisory(info);
661                                }
662                            }
663
664                            //After re-added, clean up any empty durables
665                            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
666                                DemandSubscription ds = i.next();
667                                if (matchesDynamicallyIncludedDestinations(ds.getLocalInfo().getDestination())) {
668                                    cleanupDurableSub(ds, i);
669                                }
670                            }
671                        }
672                    }
673                } else if (command.getClass() == ConnectionError.class) {
674                    ConnectionError ce = (ConnectionError) command;
675                    serviceRemoteException(ce.getException());
676                } else {
677                    if (isDuplex()) {
678                        LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
679                        if (command.isMessage()) {
680                            final ActiveMQMessage message = (ActiveMQMessage) command;
681                            if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
682                                serviceRemoteConsumerAdvisory(message.getDataStructure());
683                                ackAdvisory(message);
684                            } else {
685                                if (!isPermissableDestination(message.getDestination(), true)) {
686                                    return;
687                                }
688                                // message being forwarded - we need to
689                                // propagate the response to our local send
690                                if (canDuplexDispatch(message)) {
691                                    message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
692                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
693                                        duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
694                                            final int correlationId = message.getCommandId();
695
696                                            @Override
697                                            public void onCompletion(FutureResponse resp) {
698                                                try {
699                                                    Response reply = resp.getResult();
700                                                    reply.setCorrelationId(correlationId);
701                                                    remoteBroker.oneway(reply);
702                                                    //increment counter when messages are received in duplex mode
703                                                    networkBridgeStatistics.getReceivedCount().increment();
704                                                } catch (IOException error) {
705                                                    LOG.error("Exception: {} on duplex forward of: {}", error, message);
706                                                    serviceRemoteException(error);
707                                                }
708                                            }
709                                        });
710                                    } else {
711                                        duplexInboundLocalBroker.oneway(message);
712                                        networkBridgeStatistics.getReceivedCount().increment();
713                                    }
714                                    serviceInboundMessage(message);
715                                } else {
716                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
717                                        Response reply = new Response();
718                                        reply.setCorrelationId(message.getCommandId());
719                                        remoteBroker.oneway(reply);
720                                    }
721                                }
722                            }
723                        } else {
724                            switch (command.getDataStructureType()) {
725                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
726                                    if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
727                                        // end of initiating connection setup - propogate to initial connection to get mbean by clientid
728                                        duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
729                                    } else {
730                                        localBroker.oneway(command);
731                                    }
732                                    break;
733                                case SessionInfo.DATA_STRUCTURE_TYPE:
734                                    localBroker.oneway(command);
735                                    break;
736                                case ProducerInfo.DATA_STRUCTURE_TYPE:
737                                    // using duplexInboundLocalProducerInfo
738                                    break;
739                                case MessageAck.DATA_STRUCTURE_TYPE:
740                                    MessageAck ack = (MessageAck) command;
741                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
742                                    if (localSub != null) {
743                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
744                                        localBroker.oneway(ack);
745                                    } else {
746                                        LOG.warn("Matching local subscription not found for ack: {}", ack);
747                                    }
748                                    break;
749                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
750                                    localStartedLatch.await();
751                                    if (started.get()) {
752                                        addConsumerInfo((ConsumerInfo) command);
753                                    } else {
754                                        // received a subscription whilst stopping
755                                        LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
756                                    }
757                                    break;
758                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
759                                    // initiator is shutting down, controlled case
760                                    // abortive close dealt with by inactivity monitor
761                                    LOG.info("Stopping network bridge on shutdown of remote broker");
762                                    serviceRemoteException(new IOException(command.toString()));
763                                    break;
764                                default:
765                                    LOG.debug("Ignoring remote command: {}", command);
766                            }
767                        }
768                    } else {
769                        switch (command.getDataStructureType()) {
770                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
771                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
772                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
773                                break;
774                            default:
775                                LOG.warn("Unexpected remote command: {}", command);
776                        }
777                    }
778                }
779            } catch (Throwable e) {
780                LOG.debug("Exception processing remote command: {}", command, e);
781                serviceRemoteException(e);
782            }
783        }
784    }
785
786    private void ackAdvisory(Message message) throws IOException {
787        demandConsumerDispatched++;
788        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() *
789                (configuration.getAdvisoryAckPercentage() / 100f))) {
790            MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
791            ack.setConsumerId(demandConsumerInfo.getConsumerId());
792            remoteBroker.oneway(ack);
793            demandConsumerDispatched = 0;
794        }
795    }
796
797    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
798        final int networkTTL = configuration.getConsumerTTL();
799        if (data.getClass() == ConsumerInfo.class) {
800            // Create a new local subscription
801            ConsumerInfo info = (ConsumerInfo) data;
802            BrokerId[] path = info.getBrokerPath();
803
804            if (info.isBrowser()) {
805                LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
806                return;
807            }
808
809            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
810                LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
811                        configuration.getBrokerName(), remoteBrokerName, networkTTL, info
812                });
813                return;
814            }
815
816            if (contains(path, localBrokerPath[0])) {
817                // Ignore this consumer as it's a consumer we locally sent to the broker.
818                LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
819                        configuration.getBrokerName(), remoteBrokerName, info
820                });
821                return;
822            }
823
824            if (!isPermissableDestination(info.getDestination())) {
825                // ignore if not in the permitted or in the excluded list
826                LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
827                        configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
828                });
829                return;
830            }
831
832            // in a cyclic network there can be multiple bridges per broker that can propagate
833            // a network subscription so there is a need to synchronize on a shared entity
834            synchronized (brokerService.getVmConnectorURI()) {
835                addConsumerInfo(info);
836            }
837        } else if (data.getClass() == DestinationInfo.class) {
838            // It's a destination info - we want to pass up information about temporary destinations
839            final DestinationInfo destInfo = (DestinationInfo) data;
840            BrokerId[] path = destInfo.getBrokerPath();
841            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
842                LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
843                        configuration.getBrokerName(), destInfo, networkTTL
844                });
845                return;
846            }
847            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
848                LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
849                return;
850            }
851            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
852            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
853                // re-set connection id so comes from here
854                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
855                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
856            }
857            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
858            LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
859                    configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
860            });
861            if (destInfo.isRemoveOperation()) {
862                // Serialize with removeSub operations such that all removeSub advisories
863                // are generated
864                serialExecutor.execute(new Runnable() {
865                    @Override
866                    public void run() {
867                        try {
868                            localBroker.oneway(destInfo);
869                        } catch (IOException e) {
870                            LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
871                        }
872                    }
873                });
874            } else {
875                localBroker.oneway(destInfo);
876            }
877        } else if (data.getClass() == RemoveInfo.class) {
878            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
879            removeDemandSubscription(id);
880
881            if (forcedDurableRemoteId.remove(id)) {
882                for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
883                    DemandSubscription ds = i.next();
884                    boolean removed = ds.removeForcedDurableConsumer(id);
885                    if (removed) {
886                        cleanupDurableSub(ds, i);
887                    }
888                }
889           }
890
891        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
892            RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
893            SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
894            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
895                DemandSubscription ds = i.next();
896                boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
897                if (removed) {
898                    cleanupDurableSub(ds, i);
899                }
900            }
901        }
902    }
903
904    private void cleanupDurableSub(final DemandSubscription ds,
905            Iterator<DemandSubscription> i) throws IOException {
906        if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
907                && ds.getForcedDurableConsumersSize() == 0) {
908
909            // deactivate subscriber
910            RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
911            localBroker.oneway(removeInfo);
912
913            // remove subscriber
914            RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
915            sending.setClientId(localClientId);
916            sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
917            sending.setConnectionId(this.localConnectionInfo.getConnectionId());
918            localBroker.oneway(sending);
919
920            //remove subscriber from map
921            i.remove();
922        }
923    }
924
925    @Override
926    public void serviceLocalException(Throwable error) {
927        serviceLocalException(null, error);
928    }
929
930    public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
931        LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
932        if (!disposed.get()) {
933            if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
934                // not a reason to terminate the bridge - temps can disappear with
935                // pending sends as the demand sub may outlive the remote dest
936                if (messageDispatch != null) {
937                    LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
938                    try {
939                        MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
940                        poisonAck.setPoisonCause(error);
941                        localBroker.oneway(poisonAck);
942                    } catch (IOException ioe) {
943                        LOG.error("Failed to posion ack message following forward failure: ", ioe);
944                    }
945                    fireFailedForwardAdvisory(messageDispatch, error);
946                } else {
947                    LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error);
948                }
949                return;
950            }
951
952            LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
953            LOG.debug("The local Exception was: {}", error, error);
954
955            brokerService.getTaskRunnerFactory().execute(new Runnable() {
956                @Override
957                public void run() {
958                    ServiceSupport.dispose(getControllingService());
959                }
960            });
961            fireBridgeFailed(error);
962        }
963    }
964
965    private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
966        if (configuration.isAdvisoryForFailedForward()) {
967            AdvisoryBroker advisoryBroker = null;
968            try {
969                advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
970
971                if (advisoryBroker != null) {
972                    ConnectionContext context = new ConnectionContext();
973                    context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
974                    context.setBroker(brokerService.getBroker());
975
976                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
977                    advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
978                    advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
979                            advisoryMessage);
980
981                }
982            } catch (Exception e) {
983                LOG.warn("failed to fire forward failure advisory, cause: {}", e);
984                LOG.debug("detail", e);
985            }
986        }
987    }
988
989    protected Service getControllingService() {
990        return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
991    }
992
993    protected void addSubscription(DemandSubscription sub) throws IOException {
994        if (sub != null) {
995            if (isDuplex()) {
996                // async vm transport, need to wait for completion
997                localBroker.request(sub.getLocalInfo());
998            } else {
999                localBroker.oneway(sub.getLocalInfo());
1000            }
1001        }
1002    }
1003
1004    protected void removeSubscription(final DemandSubscription sub) throws IOException {
1005        if (sub != null) {
1006            LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
1007
1008            // ensure not available for conduit subs pending removal
1009            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1010            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1011
1012            // continue removal in separate thread to free up this thread for outstanding responses
1013            // Serialize with removeDestination operations so that removeSubs are serialized with
1014            // removeDestinations such that all removeSub advisories are generated
1015            serialExecutor.execute(new Runnable() {
1016                @Override
1017                public void run() {
1018                    sub.waitForCompletion();
1019                    try {
1020                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
1021                    } catch (IOException e) {
1022                        LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
1023                    }
1024                }
1025            });
1026        }
1027    }
1028
1029    protected Message configureMessage(MessageDispatch md) throws IOException {
1030        Message message = md.getMessage().copy();
1031        // Update the packet to show where it came from.
1032        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
1033        message.setProducerId(producerInfo.getProducerId());
1034        message.setDestination(md.getDestination());
1035        message.setMemoryUsage(null);
1036        if (message.getOriginalTransactionId() == null) {
1037            message.setOriginalTransactionId(message.getTransactionId());
1038        }
1039        message.setTransactionId(null);
1040        if (configuration.isUseCompression()) {
1041            message.compress();
1042        }
1043        return message;
1044    }
1045
1046    protected void serviceLocalCommand(Command command) {
1047        if (!disposed.get()) {
1048            try {
1049                if (command.isMessageDispatch()) {
1050                    safeWaitUntilStarted();
1051                    networkBridgeStatistics.getEnqueues().increment();
1052                    final MessageDispatch md = (MessageDispatch) command;
1053                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
1054                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
1055
1056                        if (suppressMessageDispatch(md, sub)) {
1057                            LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
1058                                    configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
1059                            });
1060                            // still ack as it may be durable
1061                            try {
1062                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1063                            } finally {
1064                                sub.decrementOutstandingResponses();
1065                            }
1066                            return;
1067                        }
1068
1069                        Message message = configureMessage(md);
1070                        LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
1071                                configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId())
1072                        });
1073                        if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
1074                            try {
1075                                // never request b/c they are eventually                     acked async
1076                                remoteBroker.oneway(message);
1077                            } finally {
1078                                sub.decrementOutstandingResponses();
1079                            }
1080                            return;
1081                        }
1082                        if (isPermissableDestination(md.getDestination())) {
1083                           if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
1084
1085                              // The message was not sent using async send, so we should only
1086                              // ack the local broker when we get confirmation that the remote
1087                              // broker has received the message.
1088                              remoteBroker.asyncRequest(message, new ResponseCallback() {
1089                                 @Override
1090                                 public void onCompletion(FutureResponse future) {
1091                                    try {
1092                                       Response response = future.getResult();
1093                                       if (response.isException()) {
1094                                          ExceptionResponse er = (ExceptionResponse) response;
1095                                          serviceLocalException(md, er.getException());
1096                                       } else {
1097                                          localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1098                                          networkBridgeStatistics.getDequeues().increment();
1099                                       }
1100                                    } catch (IOException e) {
1101                                       serviceLocalException(md, e);
1102                                    } finally {
1103                                       sub.decrementOutstandingResponses();
1104                                    }
1105                                 }
1106                              });
1107
1108                           } else {
1109                              // If the message was originally sent using async send, we will
1110                              // preserve that QOS by bridging it using an async send (small chance
1111                              // of message loss).
1112                              try {
1113                                 remoteBroker.oneway(message);
1114                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1115                                 networkBridgeStatistics.getDequeues().increment();
1116                              } finally {
1117                                 sub.decrementOutstandingResponses();
1118                              }
1119                           }
1120                           serviceOutbound(message);
1121                        }
1122                    } else {
1123                        LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
1124                    }
1125                } else if (command.isBrokerInfo()) {
1126                    futureLocalBrokerInfo.set((BrokerInfo) command);
1127                } else if (command.isShutdownInfo()) {
1128                    LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
1129                    stop();
1130                } else if (command.getClass() == ConnectionError.class) {
1131                    ConnectionError ce = (ConnectionError) command;
1132                    serviceLocalException(ce.getException());
1133                } else {
1134                    switch (command.getDataStructureType()) {
1135                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
1136                            break;
1137                        case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE:
1138                            break;
1139                        default:
1140                            LOG.warn("Unexpected local command: {}", command);
1141                    }
1142                }
1143            } catch (Throwable e) {
1144                LOG.warn("Caught an exception processing local command", e);
1145                serviceLocalException(e);
1146            }
1147        }
1148    }
1149
1150    private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
1151        boolean suppress = false;
1152        // for durable subs, suppression via filter leaves dangling acks so we
1153        // need to check here and allow the ack irrespective
1154        if (sub.getLocalInfo().isDurable()) {
1155            MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
1156            messageEvalContext.setMessageReference(md.getMessage());
1157            messageEvalContext.setDestination(md.getDestination());
1158            suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
1159        }
1160        return suppress;
1161    }
1162
1163    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
1164        if (brokerPath != null) {
1165            for (BrokerId id : brokerPath) {
1166                if (brokerId.equals(id)) {
1167                    return true;
1168                }
1169            }
1170        }
1171        return false;
1172    }
1173
1174    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
1175        if (brokerPath == null || brokerPath.length == 0) {
1176            return pathsToAppend;
1177        }
1178        BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
1179        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1180        System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
1181        return rc;
1182    }
1183
1184    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
1185        if (brokerPath == null || brokerPath.length == 0) {
1186            return new BrokerId[]{idToAppend};
1187        }
1188        BrokerId rc[] = new BrokerId[brokerPath.length + 1];
1189        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1190        rc[brokerPath.length] = idToAppend;
1191        return rc;
1192    }
1193
1194    protected boolean isPermissableDestination(ActiveMQDestination destination) {
1195        return isPermissableDestination(destination, false);
1196    }
1197
1198    protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
1199        // Are we not bridging temporary destinations?
1200        if (destination.isTemporary()) {
1201            if (allowTemporary) {
1202                return true;
1203            } else {
1204                return configuration.isBridgeTempDestinations();
1205            }
1206        }
1207
1208        ActiveMQDestination[] dests = excludedDestinations;
1209        if (dests != null && dests.length > 0) {
1210            for (ActiveMQDestination dest : dests) {
1211                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
1212                if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1213                    return false;
1214                }
1215            }
1216        }
1217
1218        dests = staticallyIncludedDestinations;
1219        if (dests != null && dests.length > 0) {
1220            for (ActiveMQDestination dest : dests) {
1221                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1222                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1223                    return true;
1224                }
1225            }
1226        }
1227
1228        dests = dynamicallyIncludedDestinations;
1229        if (dests != null && dests.length > 0) {
1230            for (ActiveMQDestination dest : dests) {
1231                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1232                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1233                    return true;
1234                }
1235            }
1236
1237            return false;
1238        }
1239
1240        return true;
1241    }
1242
1243    private boolean matchesDynamicallyIncludedDestinations(ActiveMQDestination destination) {
1244        ActiveMQDestination[] dests = dynamicallyIncludedDestinations;
1245        if (dests != null && dests.length > 0) {
1246            for (ActiveMQDestination dest : dests) {
1247                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1248                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1249                    return true;
1250                }
1251            }
1252        }
1253
1254        return false;
1255    }
1256
1257    protected ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) {
1258        if (dests != null && dests.length > 0) {
1259            for (ActiveMQDestination dest : dests) {
1260                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1261                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1262                    return dest;
1263                }
1264            }
1265        }
1266
1267        return null;
1268    }
1269
1270    /**
1271     * Subscriptions for these destinations are always created
1272     */
1273    protected void setupStaticDestinations() {
1274        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1275        if (dests != null) {
1276            for (ActiveMQDestination dest : dests) {
1277                if (isPermissableDestination(dest)) {
1278                    DemandSubscription sub = createDemandSubscription(dest, null);
1279                    sub.setStaticallyIncluded(true);
1280                    try {
1281                        addSubscription(sub);
1282                    } catch (IOException e) {
1283                        LOG.error("Failed to add static destination {}", dest, e);
1284                    }
1285                    LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
1286                } else {
1287                    LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest);
1288                }
1289            }
1290        }
1291    }
1292
1293    protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1294        ConsumerInfo info = consumerInfo.copy();
1295        addRemoteBrokerToBrokerPath(info);
1296        DemandSubscription sub = createDemandSubscription(info);
1297        if (sub != null) {
1298            if (duplicateSuppressionIsRequired(sub)) {
1299                undoMapRegistration(sub);
1300            } else {
1301                if (consumerInfo.isDurable()) {
1302                    sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
1303                }
1304                addSubscription(sub);
1305                LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub);
1306            }
1307        }
1308    }
1309
1310    private void undoMapRegistration(DemandSubscription sub) {
1311        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1312        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1313    }
1314
1315    /*
1316     * check our existing subs networkConsumerIds against the list of network
1317     * ids in this subscription A match means a duplicate which we suppress for
1318     * topics and maybe for queues
1319     */
1320    private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1321        final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1322        boolean suppress = false;
1323
1324        if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic()
1325                && !configuration.isSuppressDuplicateTopicSubscriptions()) {
1326            return suppress;
1327        }
1328
1329        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1330        Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
1331        for (Subscription sub : currentSubs) {
1332            List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1333            if (!networkConsumers.isEmpty()) {
1334                if (matchFound(candidateConsumers, networkConsumers)) {
1335                    if (isInActiveDurableSub(sub)) {
1336                        suppress = false;
1337                    } else {
1338                        suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1339                    }
1340                    break;
1341                }
1342            }
1343        }
1344        return suppress;
1345    }
1346
1347    private boolean isInActiveDurableSub(Subscription sub) {
1348        return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
1349    }
1350
1351    private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1352        boolean suppress = false;
1353
1354        if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1355            LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{
1356                    configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds()
1357            });
1358            suppress = true;
1359        } else {
1360            // remove the existing lower priority duplicate and allow this candidate
1361            try {
1362                removeDuplicateSubscription(existingSub);
1363
1364                LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{
1365                        configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds()
1366                });
1367            } catch (IOException e) {
1368                LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e);
1369            }
1370        }
1371        return suppress;
1372    }
1373
1374    private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1375        for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1376            if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1377                break;
1378            }
1379        }
1380    }
1381
1382    private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1383        boolean found = false;
1384        for (ConsumerId aliasConsumer : networkConsumers) {
1385            if (candidateConsumers.contains(aliasConsumer)) {
1386                found = true;
1387                break;
1388            }
1389        }
1390        return found;
1391    }
1392
1393    protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1394        RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1395        Region region;
1396        Collection<Subscription> subs;
1397
1398        region = null;
1399        switch (dest.getDestinationType()) {
1400            case ActiveMQDestination.QUEUE_TYPE:
1401                region = region_broker.getQueueRegion();
1402                break;
1403            case ActiveMQDestination.TOPIC_TYPE:
1404                region = region_broker.getTopicRegion();
1405                break;
1406            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1407                region = region_broker.getTempQueueRegion();
1408                break;
1409            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1410                region = region_broker.getTempTopicRegion();
1411                break;
1412        }
1413
1414        if (region instanceof AbstractRegion) {
1415            subs = ((AbstractRegion) region).getSubscriptions().values();
1416        } else {
1417            subs = null;
1418        }
1419
1420        return subs;
1421    }
1422
1423    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1424        // add our original id to ourselves
1425        info.addNetworkConsumerId(info.getConsumerId());
1426        return doCreateDemandSubscription(info);
1427    }
1428
1429    protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1430        DemandSubscription result = new DemandSubscription(info);
1431        result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1432        if (info.getDestination().isTemporary()) {
1433            // reset the local connection Id
1434            ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1435            dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1436        }
1437
1438        if (configuration.isDecreaseNetworkConsumerPriority()) {
1439            byte priority = (byte) configuration.getConsumerPriorityBase();
1440            if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1441                // The longer the path to the consumer, the less it's consumer priority.
1442                priority -= info.getBrokerPath().length + 1;
1443            }
1444            result.getLocalInfo().setPriority(priority);
1445            LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
1446        }
1447        configureDemandSubscription(info, result);
1448        return result;
1449    }
1450
1451    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination, final String subscriptionName) {
1452        ConsumerInfo info = new ConsumerInfo();
1453        info.setNetworkSubscription(true);
1454        info.setDestination(destination);
1455
1456        if (subscriptionName != null) {
1457            info.setSubscriptionName(subscriptionName);
1458        }
1459
1460        // Indicate that this subscription is being made on behalf of the remote broker.
1461        info.setBrokerPath(new BrokerId[]{remoteBrokerId});
1462
1463        // the remote info held by the DemandSubscription holds the original
1464        // consumerId, the local info get's overwritten
1465        info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1466        DemandSubscription result = null;
1467        try {
1468            result = createDemandSubscription(info);
1469        } catch (IOException e) {
1470            LOG.error("Failed to create DemandSubscription ", e);
1471        }
1472        return result;
1473    }
1474
1475    protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1476        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ||
1477                AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
1478            sub.getLocalInfo().setDispatchAsync(true);
1479        } else {
1480            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1481        }
1482        configureConsumerPrefetch(sub.getLocalInfo());
1483        subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1484        subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1485
1486        sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1487        if (!info.isDurable()) {
1488            // This works for now since we use a VM connection to the local broker.
1489            // may need to change if we ever subscribe to a remote broker.
1490            sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1491        } else {
1492            sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
1493        }
1494    }
1495
1496    protected void removeDemandSubscription(ConsumerId id) throws IOException {
1497        DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1498        LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{
1499                configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub
1500        });
1501        if (sub != null) {
1502            removeSubscription(sub);
1503            LOG.debug("{} removed sub on {} from {}: {}", new Object[]{
1504                    configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo()
1505            });
1506        }
1507    }
1508
1509    protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1510        boolean removeDone = false;
1511        DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1512        if (sub != null) {
1513            try {
1514                removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1515                removeDone = true;
1516            } catch (IOException e) {
1517                LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e);
1518            }
1519        }
1520        return removeDone;
1521    }
1522
1523    /**
1524     * Performs a timed wait on the started latch and then checks for disposed
1525     * before performing another wait each time the the started wait times out.
1526     */
1527    protected boolean safeWaitUntilStarted() throws InterruptedException {
1528        while (!disposed.get()) {
1529            if (startedLatch.await(1, TimeUnit.SECONDS)) {
1530                break;
1531            }
1532        }
1533        return !disposed.get();
1534    }
1535
1536    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1537        NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1538        if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1539            PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1540            if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1541                filterFactory = entry.getNetworkBridgeFilterFactory();
1542            }
1543        }
1544        return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL());
1545    }
1546
1547    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1548        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1549    }
1550
1551    protected BrokerId[] getRemoteBrokerPath() {
1552        return remoteBrokerPath;
1553    }
1554
1555    @Override
1556    public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1557        this.networkBridgeListener = listener;
1558    }
1559
1560    private void fireBridgeFailed(Throwable reason) {
1561        LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason);
1562        NetworkBridgeListener l = this.networkBridgeListener;
1563        if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
1564            l.bridgeFailed();
1565        }
1566    }
1567
1568    /**
1569     * @return Returns the dynamicallyIncludedDestinations.
1570     */
1571    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
1572        return dynamicallyIncludedDestinations;
1573    }
1574
1575    /**
1576     * @param dynamicallyIncludedDestinations
1577     *         The dynamicallyIncludedDestinations to set.
1578     */
1579    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
1580        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
1581    }
1582
1583    /**
1584     * @return Returns the excludedDestinations.
1585     */
1586    public ActiveMQDestination[] getExcludedDestinations() {
1587        return excludedDestinations;
1588    }
1589
1590    /**
1591     * @param excludedDestinations The excludedDestinations to set.
1592     */
1593    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
1594        this.excludedDestinations = excludedDestinations;
1595    }
1596
1597    /**
1598     * @return Returns the staticallyIncludedDestinations.
1599     */
1600    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
1601        return staticallyIncludedDestinations;
1602    }
1603
1604    /**
1605     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
1606     */
1607    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
1608        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
1609    }
1610
1611    /**
1612     * @return Returns the durableDestinations.
1613     */
1614    public ActiveMQDestination[] getDurableDestinations() {
1615        return durableDestinations;
1616    }
1617
1618    /**
1619     * @param durableDestinations The durableDestinations to set.
1620     */
1621    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
1622        this.durableDestinations = durableDestinations;
1623    }
1624
1625    /**
1626     * @return Returns the localBroker.
1627     */
1628    public Transport getLocalBroker() {
1629        return localBroker;
1630    }
1631
1632    /**
1633     * @return Returns the remoteBroker.
1634     */
1635    public Transport getRemoteBroker() {
1636        return remoteBroker;
1637    }
1638
1639    /**
1640     * @return the createdByDuplex
1641     */
1642    public boolean isCreatedByDuplex() {
1643        return this.createdByDuplex;
1644    }
1645
1646    /**
1647     * @param createdByDuplex the createdByDuplex to set
1648     */
1649    public void setCreatedByDuplex(boolean createdByDuplex) {
1650        this.createdByDuplex = createdByDuplex;
1651    }
1652
1653    @Override
1654    public String getRemoteAddress() {
1655        return remoteBroker.getRemoteAddress();
1656    }
1657
1658    @Override
1659    public String getLocalAddress() {
1660        return localBroker.getRemoteAddress();
1661    }
1662
1663    @Override
1664    public String getRemoteBrokerName() {
1665        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1666    }
1667
1668    @Override
1669    public String getRemoteBrokerId() {
1670        return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
1671    }
1672
1673    @Override
1674    public String getLocalBrokerName() {
1675        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1676    }
1677
1678    @Override
1679    public long getDequeueCounter() {
1680        return networkBridgeStatistics.getDequeues().getCount();
1681    }
1682
1683    @Override
1684    public long getEnqueueCounter() {
1685        return networkBridgeStatistics.getEnqueues().getCount();
1686    }
1687
1688    @Override
1689    public NetworkBridgeStatistics getNetworkBridgeStatistics() {
1690        return networkBridgeStatistics;
1691    }
1692
1693    protected boolean isDuplex() {
1694        return configuration.isDuplex() || createdByDuplex;
1695    }
1696
1697    public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1698        return subscriptionMapByRemoteId;
1699    }
1700
1701    @Override
1702    public void setBrokerService(BrokerService brokerService) {
1703        this.brokerService = brokerService;
1704        this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1705        localBrokerPath[0] = localBrokerId;
1706    }
1707
1708    @Override
1709    public void setMbeanObjectName(ObjectName objectName) {
1710        this.mbeanObjectName = objectName;
1711    }
1712
1713    @Override
1714    public ObjectName getMbeanObjectName() {
1715        return mbeanObjectName;
1716    }
1717
1718    @Override
1719    public void resetStats() {
1720        networkBridgeStatistics.reset();
1721    }
1722
1723    /*
1724     * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
1725     * remote sides of the network bridge.
1726     */
1727    private static class FutureBrokerInfo implements Future<BrokerInfo> {
1728
1729        private final CountDownLatch slot = new CountDownLatch(1);
1730        private final AtomicBoolean disposed;
1731        private volatile BrokerInfo info = null;
1732
1733        public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
1734            this.info = info;
1735            this.disposed = disposed;
1736        }
1737
1738        @Override
1739        public boolean cancel(boolean mayInterruptIfRunning) {
1740            slot.countDown();
1741            return true;
1742        }
1743
1744        @Override
1745        public boolean isCancelled() {
1746            return slot.getCount() == 0 && info == null;
1747        }
1748
1749        @Override
1750        public boolean isDone() {
1751            return info != null;
1752        }
1753
1754        @Override
1755        public BrokerInfo get() throws InterruptedException, ExecutionException {
1756            try {
1757                if (info == null) {
1758                    while (!disposed.get()) {
1759                        if (slot.await(1, TimeUnit.SECONDS)) {
1760                            break;
1761                        }
1762                    }
1763                }
1764                return info;
1765            } catch (InterruptedException e) {
1766                Thread.currentThread().interrupt();
1767                LOG.debug("Operation interrupted: {}", e, e);
1768                throw new InterruptedException("Interrupted.");
1769            }
1770        }
1771
1772        @Override
1773        public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1774            try {
1775                if (info == null) {
1776                    long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
1777
1778                    while (!disposed.get() || System.currentTimeMillis() < deadline) {
1779                        if (slot.await(1, TimeUnit.MILLISECONDS)) {
1780                            break;
1781                        }
1782                    }
1783                    if (info == null) {
1784                        throw new TimeoutException();
1785                    }
1786                }
1787                return info;
1788            } catch (InterruptedException e) {
1789                throw new InterruptedException("Interrupted.");
1790            }
1791        }
1792
1793        public void set(BrokerInfo info) {
1794            this.info = info;
1795            this.slot.countDown();
1796        }
1797    }
1798
1799    protected void serviceOutbound(Message message) {
1800        NetworkBridgeListener l = this.networkBridgeListener;
1801        if (l != null) {
1802            l.onOutboundMessage(this, message);
1803        }
1804    }
1805
1806    protected void serviceInboundMessage(Message message) {
1807        NetworkBridgeListener l = this.networkBridgeListener;
1808        if (l != null) {
1809            l.onInboundMessage(this, message);
1810        }
1811    }
1812
1813    protected boolean canDuplexDispatch(Message message) {
1814        boolean result = true;
1815        if (configuration.isCheckDuplicateMessagesOnDuplex()){
1816            final long producerSequenceId = message.getMessageId().getProducerSequenceId();
1817            //  messages are multiplexed on this producer so we need to query the persistenceAdapter
1818            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
1819            if (producerSequenceId <= lastStoredForMessageProducer) {
1820                result = false;
1821                LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
1822                        (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
1823                });
1824            }
1825        }
1826        return result;
1827    }
1828
1829    protected long getStoredSequenceIdForMessage(MessageId messageId) {
1830        try {
1831            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
1832        } catch (IOException ignored) {
1833            LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
1834        }
1835        return -1;
1836    }
1837
1838    protected void configureConsumerPrefetch(ConsumerInfo consumerInfo) {
1839        //If a consumer on an advisory topic and advisoryPrefetchSize has been explicitly
1840        //set then use it, else default to the prefetchSize setting
1841        if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination()) &&
1842                configuration.getAdvisoryPrefetchSize() > 0) {
1843            consumerInfo.setPrefetchSize(configuration.getAdvisoryPrefetchSize());
1844        } else {
1845            consumerInfo.setPrefetchSize(configuration.getPrefetchSize());
1846        }
1847    }
1848
1849}