001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.security.GeneralSecurityException;
021import java.security.cert.X509Certificate;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Collections;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Properties;
028import java.util.Set;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031import java.util.concurrent.CountDownLatch;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.ExecutorService;
034import java.util.concurrent.Executors;
035import java.util.concurrent.Future;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import java.util.concurrent.atomic.AtomicBoolean;
039
040import javax.management.ObjectName;
041
042import org.apache.activemq.DestinationDoesNotExistException;
043import org.apache.activemq.Service;
044import org.apache.activemq.advisory.AdvisoryBroker;
045import org.apache.activemq.advisory.AdvisorySupport;
046import org.apache.activemq.broker.BrokerService;
047import org.apache.activemq.broker.BrokerServiceAware;
048import org.apache.activemq.broker.ConnectionContext;
049import org.apache.activemq.broker.TransportConnection;
050import org.apache.activemq.broker.region.AbstractRegion;
051import org.apache.activemq.broker.region.DurableTopicSubscription;
052import org.apache.activemq.broker.region.Region;
053import org.apache.activemq.broker.region.RegionBroker;
054import org.apache.activemq.broker.region.Subscription;
055import org.apache.activemq.broker.region.policy.PolicyEntry;
056import org.apache.activemq.command.ActiveMQDestination;
057import org.apache.activemq.command.ActiveMQMessage;
058import org.apache.activemq.command.ActiveMQTempDestination;
059import org.apache.activemq.command.ActiveMQTopic;
060import org.apache.activemq.command.BrokerId;
061import org.apache.activemq.command.BrokerInfo;
062import org.apache.activemq.command.BrokerSubscriptionInfo;
063import org.apache.activemq.command.Command;
064import org.apache.activemq.command.CommandTypes;
065import org.apache.activemq.command.ConnectionError;
066import org.apache.activemq.command.ConnectionId;
067import org.apache.activemq.command.ConnectionInfo;
068import org.apache.activemq.command.ConsumerId;
069import org.apache.activemq.command.ConsumerInfo;
070import org.apache.activemq.command.DataStructure;
071import org.apache.activemq.command.DestinationInfo;
072import org.apache.activemq.command.ExceptionResponse;
073import org.apache.activemq.command.KeepAliveInfo;
074import org.apache.activemq.command.Message;
075import org.apache.activemq.command.MessageAck;
076import org.apache.activemq.command.MessageDispatch;
077import org.apache.activemq.command.MessageId;
078import org.apache.activemq.command.NetworkBridgeFilter;
079import org.apache.activemq.command.ProducerInfo;
080import org.apache.activemq.command.RemoveInfo;
081import org.apache.activemq.command.RemoveSubscriptionInfo;
082import org.apache.activemq.command.Response;
083import org.apache.activemq.command.SessionInfo;
084import org.apache.activemq.command.ShutdownInfo;
085import org.apache.activemq.command.SubscriptionInfo;
086import org.apache.activemq.command.WireFormatInfo;
087import org.apache.activemq.filter.DestinationFilter;
088import org.apache.activemq.filter.MessageEvaluationContext;
089import org.apache.activemq.security.SecurityContext;
090import org.apache.activemq.transport.DefaultTransportListener;
091import org.apache.activemq.transport.FutureResponse;
092import org.apache.activemq.transport.ResponseCallback;
093import org.apache.activemq.transport.Transport;
094import org.apache.activemq.transport.TransportDisposedIOException;
095import org.apache.activemq.transport.TransportFilter;
096import org.apache.activemq.transport.tcp.SslTransport;
097import org.apache.activemq.transport.tcp.TcpTransport;
098import org.apache.activemq.util.IdGenerator;
099import org.apache.activemq.util.IntrospectionSupport;
100import org.apache.activemq.util.LongSequenceGenerator;
101import org.apache.activemq.util.MarshallingSupport;
102import org.apache.activemq.util.ServiceStopper;
103import org.apache.activemq.util.ServiceSupport;
104import org.apache.activemq.util.StringToListOfActiveMQDestinationConverter;
105import org.slf4j.Logger;
106import org.slf4j.LoggerFactory;
107
108/**
109 * A useful base class for implementing demand forwarding bridges.
110 */
111public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
112    private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
113    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
114    protected final Transport localBroker;
115    protected final Transport remoteBroker;
116    protected IdGenerator idGenerator = new IdGenerator();
117    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
118    protected ConnectionInfo localConnectionInfo;
119    protected ConnectionInfo remoteConnectionInfo;
120    protected SessionInfo localSessionInfo;
121    protected ProducerInfo producerInfo;
122    protected String remoteBrokerName = "Unknown";
123    protected String localClientId;
124    protected ConsumerInfo demandConsumerInfo;
125    protected int demandConsumerDispatched;
126    protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
127    protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
128    protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
129    protected final AtomicBoolean disposed = new AtomicBoolean();
130    protected BrokerId localBrokerId;
131    protected ActiveMQDestination[] excludedDestinations;
132    protected ActiveMQDestination[] dynamicallyIncludedDestinations;
133    protected ActiveMQDestination[] staticallyIncludedDestinations;
134    protected ActiveMQDestination[] durableDestinations;
135    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>();
136    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>();
137    protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>());
138    protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
139    protected final CountDownLatch startedLatch = new CountDownLatch(2);
140    protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
141    protected final CountDownLatch staticDestinationsLatch = new CountDownLatch(1);
142    protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
143    protected NetworkBridgeConfiguration configuration;
144    protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
145
146    protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
147    protected BrokerId remoteBrokerId;
148
149    protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics();
150
151    private NetworkBridgeListener networkBridgeListener;
152    private boolean createdByDuplex;
153    private BrokerInfo localBrokerInfo;
154    private BrokerInfo remoteBrokerInfo;
155
156    private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
157    private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
158
159    private final AtomicBoolean started = new AtomicBoolean();
160    private TransportConnection duplexInitiatingConnection;
161    private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean();
162    protected BrokerService brokerService = null;
163    private ObjectName mbeanObjectName;
164    private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
165    private Transport duplexInboundLocalBroker = null;
166    private ProducerInfo duplexInboundLocalProducerInfo;
167
168    public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
169        this.configuration = configuration;
170        this.localBroker = localBroker;
171        this.remoteBroker = remoteBroker;
172    }
173
174    public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
175        this.localBrokerInfo = localBrokerInfo;
176        this.remoteBrokerInfo = remoteBrokerInfo;
177        this.duplexInitiatingConnection = connection;
178        start();
179        serviceRemoteCommand(remoteBrokerInfo);
180    }
181
182    @Override
183    public void start() throws Exception {
184        if (started.compareAndSet(false, true)) {
185
186            if (brokerService == null) {
187                throw new IllegalArgumentException("BrokerService is null on " + this);
188            }
189
190            networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
191
192            if (isDuplex()) {
193                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
194                duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
195
196                    @Override
197                    public void onCommand(Object o) {
198                        Command command = (Command) o;
199                        serviceLocalCommand(command);
200                    }
201
202                    @Override
203                    public void onException(IOException error) {
204                        serviceLocalException(error);
205                    }
206                });
207                duplexInboundLocalBroker.start();
208            }
209
210            localBroker.setTransportListener(new DefaultTransportListener() {
211
212                @Override
213                public void onCommand(Object o) {
214                    Command command = (Command) o;
215                    serviceLocalCommand(command);
216                }
217
218                @Override
219                public void onException(IOException error) {
220                    if (!futureLocalBrokerInfo.isDone()) {
221                        futureLocalBrokerInfo.cancel(true);
222                        return;
223                    }
224                    serviceLocalException(error);
225                }
226            });
227
228            remoteBroker.setTransportListener(new DefaultTransportListener() {
229
230                @Override
231                public void onCommand(Object o) {
232                    Command command = (Command) o;
233                    serviceRemoteCommand(command);
234                }
235
236                @Override
237                public void onException(IOException error) {
238                    if (!futureRemoteBrokerInfo.isDone()) {
239                        futureRemoteBrokerInfo.cancel(true);
240                        return;
241                    }
242                    serviceRemoteException(error);
243                }
244            });
245
246            remoteBroker.start();
247            localBroker.start();
248
249            if (!disposed.get()) {
250                try {
251                    triggerStartAsyncNetworkBridgeCreation();
252                } catch (IOException e) {
253                    LOG.warn("Caught exception from remote start", e);
254                }
255            } else {
256                LOG.warn("Bridge was disposed before the start() method was fully executed.");
257                throw new TransportDisposedIOException();
258            }
259        }
260    }
261
262    @Override
263    public void stop() throws Exception {
264        if (started.compareAndSet(true, false)) {
265            if (disposed.compareAndSet(false, true)) {
266                LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName);
267
268                futureRemoteBrokerInfo.cancel(true);
269                futureLocalBrokerInfo.cancel(true);
270
271                NetworkBridgeListener l = this.networkBridgeListener;
272                if (l != null) {
273                    l.onStop(this);
274                }
275                try {
276                    // local start complete
277                    if (startedLatch.getCount() < 2) {
278                        LOG.trace("{} unregister bridge ({}) to {}", new Object[]{
279                                configuration.getBrokerName(), this, remoteBrokerName
280                        });
281                        brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
282                        brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
283                    }
284
285                    remoteBridgeStarted.set(false);
286                    final CountDownLatch sendShutdown = new CountDownLatch(1);
287
288                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
289                        @Override
290                        public void run() {
291                            try {
292                                serialExecutor.shutdown();
293                                if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
294                                    List<Runnable> pendingTasks = serialExecutor.shutdownNow();
295                                    LOG.info("pending tasks on stop {}", pendingTasks);
296                                }
297                                localBroker.oneway(new ShutdownInfo());
298                                remoteBroker.oneway(new ShutdownInfo());
299                            } catch (Throwable e) {
300                                LOG.debug("Caught exception sending shutdown", e);
301                            } finally {
302                                sendShutdown.countDown();
303                            }
304
305                        }
306                    }, "ActiveMQ ForwardingBridge StopTask");
307
308                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
309                        LOG.info("Network Could not shutdown in a timely manner");
310                    }
311                } finally {
312                    ServiceStopper ss = new ServiceStopper();
313                    ss.stop(remoteBroker);
314                    ss.stop(localBroker);
315                    ss.stop(duplexInboundLocalBroker);
316                    // Release the started Latch since another thread could be
317                    // stuck waiting for it to start up.
318                    startedLatch.countDown();
319                    startedLatch.countDown();
320                    localStartedLatch.countDown();
321                    staticDestinationsLatch.countDown();
322
323                    ss.throwFirstException();
324                }
325            }
326
327            LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName);
328        }
329    }
330
331    protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
332        brokerService.getTaskRunnerFactory().execute(new Runnable() {
333            @Override
334            public void run() {
335                final String originalName = Thread.currentThread().getName();
336                Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
337                        "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
338
339                try {
340                    // First we collect the info data from both the local and remote ends
341                    collectBrokerInfos();
342
343                    // Once we have all required broker info we can attempt to start
344                    // the local and then remote sides of the bridge.
345                    doStartLocalAndRemoteBridges();
346                } finally {
347                    Thread.currentThread().setName(originalName);
348                }
349            }
350        });
351    }
352
353    private void collectBrokerInfos() {
354        int timeout = 30000;
355        TcpTransport tcpTransport = remoteBroker.narrow(TcpTransport.class);
356        if (tcpTransport != null) {
357           timeout = tcpTransport.getConnectionTimeout();
358        }
359
360        // First wait for the remote to feed us its BrokerInfo, then we can check on
361        // the LocalBrokerInfo and decide is this is a loop.
362        try {
363            remoteBrokerInfo = futureRemoteBrokerInfo.get(timeout, TimeUnit.MILLISECONDS);
364            if (remoteBrokerInfo == null) {
365                serviceLocalException(new Throwable("remoteBrokerInfo is null"));
366                return;
367            }
368        } catch (Exception e) {
369            serviceRemoteException(e);
370            return;
371        }
372
373        try {
374            localBrokerInfo = futureLocalBrokerInfo.get(timeout, TimeUnit.MILLISECONDS);
375            if (localBrokerInfo == null) {
376                serviceLocalException(new Throwable("localBrokerInfo is null"));
377                return;
378            }
379
380            // Before we try and build the bridge lets check if we are in a loop
381            // and if so just stop now before registering anything.
382            remoteBrokerId = remoteBrokerInfo.getBrokerId();
383            if (localBrokerId.equals(remoteBrokerId)) {
384                LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
385                        configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
386                });
387                ServiceSupport.dispose(localBroker);
388                ServiceSupport.dispose(remoteBroker);
389                // the bridge is left in a bit of limbo, but it won't get retried
390                // in this state.
391                return;
392            }
393
394            // Fill in the remote broker's information now.
395            remoteBrokerPath[0] = remoteBrokerId;
396            remoteBrokerName = remoteBrokerInfo.getBrokerName();
397            if (configuration.isUseBrokerNamesAsIdSeed()) {
398                idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName);
399            }
400        } catch (Throwable e) {
401            serviceLocalException(e);
402        }
403    }
404
405    private void doStartLocalAndRemoteBridges() {
406
407        if (disposed.get()) {
408            return;
409        }
410
411        if (isCreatedByDuplex()) {
412            // apply remote (propagated) configuration to local duplex bridge before start
413            Properties props = null;
414            try {
415                props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
416                IntrospectionSupport.getProperties(configuration, props, null);
417                if (configuration.getExcludedDestinations() != null) {
418                    excludedDestinations = configuration.getExcludedDestinations().toArray(
419                            new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
420                }
421                if (configuration.getStaticallyIncludedDestinations() != null) {
422                    staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
423                            new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
424                }
425                if (configuration.getDynamicallyIncludedDestinations() != null) {
426                    dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
427                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
428                }
429            } catch (Throwable t) {
430                LOG.error("Error mapping remote configuration: {}", props, t);
431            }
432        }
433
434        try {
435            startLocalBridge();
436        } catch (Throwable e) {
437            serviceLocalException(e);
438            return;
439        }
440
441        try {
442            startRemoteBridge();
443        } catch (Throwable e) {
444            serviceRemoteException(e);
445            return;
446        }
447
448        try {
449            if (safeWaitUntilStarted()) {
450                setupStaticDestinations();
451                staticDestinationsLatch.countDown();
452            }
453        } catch (Throwable e) {
454            serviceLocalException(e);
455        }
456    }
457
458    private void startLocalBridge() throws Throwable {
459        if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) {
460            synchronized (this) {
461                LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker);
462                if (!disposed.get()) {
463
464                    if (idGenerator == null) {
465                        throw new IllegalStateException("Id Generator cannot be null");
466                    }
467
468                    localConnectionInfo = new ConnectionInfo();
469                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
470                    localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
471                    localConnectionInfo.setClientId(localClientId);
472                    localConnectionInfo.setUserName(configuration.getUserName());
473                    localConnectionInfo.setPassword(configuration.getPassword());
474                    Transport originalTransport = remoteBroker;
475                    while (originalTransport instanceof TransportFilter) {
476                        originalTransport = ((TransportFilter) originalTransport).getNext();
477                    }
478                    if (originalTransport instanceof SslTransport) {
479                        X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
480                        localConnectionInfo.setTransportContext(peerCerts);
481                    }
482                    // sync requests that may fail
483                    Object resp = localBroker.request(localConnectionInfo);
484                    if (resp instanceof ExceptionResponse) {
485                        throw ((ExceptionResponse) resp).getException();
486                    }
487                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
488                    localBroker.oneway(localSessionInfo);
489
490                    if (configuration.isDuplex()) {
491                        // separate in-bound channel for forwards so we don't
492                        // contend with out-bound dispatch on same connection
493                        remoteBrokerInfo.setNetworkConnection(true);
494                        duplexInboundLocalBroker.oneway(remoteBrokerInfo);
495
496                        ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
497                        duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
498                        duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
499                                + configuration.getBrokerName());
500                        duplexLocalConnectionInfo.setUserName(configuration.getUserName());
501                        duplexLocalConnectionInfo.setPassword(configuration.getPassword());
502
503                        if (originalTransport instanceof SslTransport) {
504                            X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
505                            duplexLocalConnectionInfo.setTransportContext(peerCerts);
506                        }
507                        // sync requests that may fail
508                        resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
509                        if (resp instanceof ExceptionResponse) {
510                            throw ((ExceptionResponse) resp).getException();
511                        }
512                        SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
513                        duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
514                        duplexInboundLocalBroker.oneway(duplexInboundSession);
515                        duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
516                    }
517                    brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
518                    NetworkBridgeListener l = this.networkBridgeListener;
519                    if (l != null) {
520                        l.onStart(this);
521                    }
522
523                    // Let the local broker know the remote broker's ID.
524                    localBroker.oneway(remoteBrokerInfo);
525                    // new peer broker (a consumer can work with remote broker also)
526                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
527
528                    LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{
529                            localBroker, remoteBroker, remoteBrokerName
530                    });
531                    LOG.trace("{} register bridge ({}) to {}", new Object[]{
532                            configuration.getBrokerName(), this, remoteBrokerName
533                    });
534                } else {
535                    LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
536                }
537                startedLatch.countDown();
538                localStartedLatch.countDown();
539            }
540        }
541    }
542
543    protected void startRemoteBridge() throws Exception {
544        if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) {
545            LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker);
546            synchronized (this) {
547                if (!isCreatedByDuplex()) {
548                    BrokerInfo brokerInfo = new BrokerInfo();
549                    brokerInfo.setBrokerName(configuration.getBrokerName());
550                    brokerInfo.setBrokerURL(configuration.getBrokerURL());
551                    brokerInfo.setNetworkConnection(true);
552                    brokerInfo.setDuplexConnection(configuration.isDuplex());
553                    // set our properties
554                    Properties props = new Properties();
555                    IntrospectionSupport.getProperties(configuration, props, null);
556
557                    String dynamicallyIncludedDestinationsKey = "dynamicallyIncludedDestinations";
558                    String staticallyIncludedDestinationsKey = "staticallyIncludedDestinations";
559
560                    if (!configuration.getDynamicallyIncludedDestinations().isEmpty()) {
561                        props.put(dynamicallyIncludedDestinationsKey,
562                                StringToListOfActiveMQDestinationConverter.
563                                convertFromActiveMQDestination(configuration.getDynamicallyIncludedDestinations(), true));
564                    }
565                    if (!configuration.getStaticallyIncludedDestinations().isEmpty()) {
566                        props.put(staticallyIncludedDestinationsKey,
567                                StringToListOfActiveMQDestinationConverter.
568                                convertFromActiveMQDestination(configuration.getStaticallyIncludedDestinations(), true));
569                    }
570
571                    props.remove("networkTTL");
572                    String str = MarshallingSupport.propertiesToString(props);
573                    brokerInfo.setNetworkProperties(str);
574                    brokerInfo.setBrokerId(this.localBrokerId);
575                    remoteBroker.oneway(brokerInfo);
576                    if (configuration.isSyncDurableSubs() &&
577                            remoteBroker.getWireFormat().getVersion() >= CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
578                        remoteBroker.oneway(TransportConnection.getBrokerSubscriptionInfo(brokerService));
579                    }
580                }
581                if (remoteConnectionInfo != null) {
582                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
583                }
584                remoteConnectionInfo = new ConnectionInfo();
585                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
586                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
587                remoteConnectionInfo.setUserName(configuration.getUserName());
588                remoteConnectionInfo.setPassword(configuration.getPassword());
589                remoteBroker.oneway(remoteConnectionInfo);
590
591                SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
592                remoteBroker.oneway(remoteSessionInfo);
593                producerInfo = new ProducerInfo(remoteSessionInfo, 1);
594                producerInfo.setResponseRequired(false);
595                remoteBroker.oneway(producerInfo);
596                // Listen to consumer advisory messages on the remote broker to determine demand.
597                if (!configuration.isStaticBridge()) {
598                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
599                    // always dispatch advisory message asynchronously so that
600                    // we never block the producer broker if we are slow
601                    demandConsumerInfo.setDispatchAsync(true);
602                    String advisoryTopic = configuration.getDestinationFilter();
603                    if (configuration.isBridgeTempDestinations()) {
604                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
605                    }
606                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
607                    configureConsumerPrefetch(demandConsumerInfo);
608                    remoteBroker.oneway(demandConsumerInfo);
609                }
610                startedLatch.countDown();
611            }
612        }
613    }
614
615    @Override
616    public void serviceRemoteException(Throwable error) {
617        if (!disposed.get()) {
618            if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
619                LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
620                        localBroker, remoteBroker, error
621                });
622            } else {
623                LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
624                        localBroker, remoteBroker, error
625                });
626            }
627            LOG.debug("The remote Exception was: {}", error, error);
628            brokerService.getTaskRunnerFactory().execute(new Runnable() {
629                @Override
630                public void run() {
631                    ServiceSupport.dispose(getControllingService());
632                }
633            });
634            fireBridgeFailed(error);
635        }
636    }
637
638    protected void serviceRemoteCommand(Command command) {
639        if (!disposed.get()) {
640            try {
641                if (command.isMessageDispatch()) {
642                    safeWaitUntilStarted();
643                    MessageDispatch md = (MessageDispatch) command;
644                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
645                    ackAdvisory(md.getMessage());
646                } else if (command.isBrokerInfo()) {
647                    futureRemoteBrokerInfo.set((BrokerInfo) command);
648                } else if (command instanceof BrokerSubscriptionInfo) {
649                    staticDestinationsLatch.await();
650                    BrokerSubscriptionInfo subInfo = (BrokerSubscriptionInfo) command;
651                    LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
652                            this.brokerService.getBrokerName(), subInfo.getBrokerName());
653
654                    if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
655                            && !configuration.isDynamicOnly()) {
656                        if (started.get()) {
657                            if (subInfo.getSubscriptionInfos() != null) {
658                                for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
659                                    if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
660                                            matchesDynamicallyIncludedDestinations(info.getDestination())) {
661                                        serviceRemoteConsumerAdvisory(info);
662                                    }
663                                }
664                            }
665
666                            //After re-added, clean up any empty durables
667                            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
668                                DemandSubscription ds = i.next();
669                                if (matchesDynamicallyIncludedDestinations(ds.getLocalInfo().getDestination())) {
670                                    cleanupDurableSub(ds, i);
671                                }
672                            }
673                        }
674                    }
675                } else if (command.getClass() == ConnectionError.class) {
676                    ConnectionError ce = (ConnectionError) command;
677                    serviceRemoteException(ce.getException());
678                } else {
679                    if (isDuplex()) {
680                        LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
681                        if (command.isMessage()) {
682                            final ActiveMQMessage message = (ActiveMQMessage) command;
683                            if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
684                                serviceRemoteConsumerAdvisory(message.getDataStructure());
685                                ackAdvisory(message);
686                            } else {
687                                if (!isPermissableDestination(message.getDestination(), true)) {
688                                    return;
689                                }
690                                // message being forwarded - we need to
691                                // propagate the response to our local send
692                                if (canDuplexDispatch(message)) {
693                                    message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
694                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
695                                        duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
696                                            final int correlationId = message.getCommandId();
697
698                                            @Override
699                                            public void onCompletion(FutureResponse resp) {
700                                                try {
701                                                    Response reply = resp.getResult();
702                                                    reply.setCorrelationId(correlationId);
703                                                    remoteBroker.oneway(reply);
704                                                    //increment counter when messages are received in duplex mode
705                                                    networkBridgeStatistics.getReceivedCount().increment();
706                                                } catch (IOException error) {
707                                                    LOG.error("Exception: {} on duplex forward of: {}", error, message);
708                                                    serviceRemoteException(error);
709                                                }
710                                            }
711                                        });
712                                    } else {
713                                        duplexInboundLocalBroker.oneway(message);
714                                        networkBridgeStatistics.getReceivedCount().increment();
715                                    }
716                                    serviceInboundMessage(message);
717                                } else {
718                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
719                                        Response reply = new Response();
720                                        reply.setCorrelationId(message.getCommandId());
721                                        remoteBroker.oneway(reply);
722                                    }
723                                }
724                            }
725                        } else {
726                            switch (command.getDataStructureType()) {
727                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
728                                    if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
729                                        // end of initiating connection setup - propogate to initial connection to get mbean by clientid
730                                        duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
731                                    } else {
732                                        localBroker.oneway(command);
733                                    }
734                                    break;
735                                case SessionInfo.DATA_STRUCTURE_TYPE:
736                                    localBroker.oneway(command);
737                                    break;
738                                case ProducerInfo.DATA_STRUCTURE_TYPE:
739                                    // using duplexInboundLocalProducerInfo
740                                    break;
741                                case MessageAck.DATA_STRUCTURE_TYPE:
742                                    MessageAck ack = (MessageAck) command;
743                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
744                                    if (localSub != null) {
745                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
746                                        localBroker.oneway(ack);
747                                    } else {
748                                        LOG.warn("Matching local subscription not found for ack: {}", ack);
749                                    }
750                                    break;
751                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
752                                    localStartedLatch.await();
753                                    if (started.get()) {
754                                        addConsumerInfo((ConsumerInfo) command);
755                                    } else {
756                                        // received a subscription whilst stopping
757                                        LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
758                                    }
759                                    break;
760                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
761                                    // initiator is shutting down, controlled case
762                                    // abortive close dealt with by inactivity monitor
763                                    LOG.info("Stopping network bridge on shutdown of remote broker");
764                                    serviceRemoteException(new IOException(command.toString()));
765                                    break;
766                                default:
767                                    LOG.debug("Ignoring remote command: {}", command);
768                            }
769                        }
770                    } else {
771                        switch (command.getDataStructureType()) {
772                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
773                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
774                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
775                                break;
776                            default:
777                                LOG.warn("Unexpected remote command: {}", command);
778                        }
779                    }
780                }
781            } catch (Throwable e) {
782                LOG.debug("Exception processing remote command: {}", command, e);
783                serviceRemoteException(e);
784            }
785        }
786    }
787
788    private void ackAdvisory(Message message) throws IOException {
789        demandConsumerDispatched++;
790        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() *
791                (configuration.getAdvisoryAckPercentage() / 100f))) {
792            MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
793            ack.setConsumerId(demandConsumerInfo.getConsumerId());
794            remoteBroker.oneway(ack);
795            demandConsumerDispatched = 0;
796        }
797    }
798
799    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
800        final int networkTTL = configuration.getConsumerTTL();
801        if (data.getClass() == ConsumerInfo.class) {
802            // Create a new local subscription
803            ConsumerInfo info = (ConsumerInfo) data;
804            BrokerId[] path = info.getBrokerPath();
805
806            if (info.isBrowser()) {
807                LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
808                return;
809            }
810
811            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
812                LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
813                        configuration.getBrokerName(), remoteBrokerName, networkTTL, info
814                });
815                return;
816            }
817
818            if (contains(path, localBrokerPath[0])) {
819                // Ignore this consumer as it's a consumer we locally sent to the broker.
820                LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
821                        configuration.getBrokerName(), remoteBrokerName, info
822                });
823                return;
824            }
825
826            if (!isPermissableDestination(info.getDestination())) {
827                // ignore if not in the permitted or in the excluded list
828                LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
829                        configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
830                });
831                return;
832            }
833
834            // in a cyclic network there can be multiple bridges per broker that can propagate
835            // a network subscription so there is a need to synchronize on a shared entity
836            synchronized (brokerService.getVmConnectorURI()) {
837                addConsumerInfo(info);
838            }
839        } else if (data.getClass() == DestinationInfo.class) {
840            // It's a destination info - we want to pass up information about temporary destinations
841            final DestinationInfo destInfo = (DestinationInfo) data;
842            BrokerId[] path = destInfo.getBrokerPath();
843            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
844                LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
845                        configuration.getBrokerName(), destInfo, networkTTL
846                });
847                return;
848            }
849            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
850                LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
851                return;
852            }
853            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
854            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
855                // re-set connection id so comes from here
856                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
857                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
858            }
859            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
860            LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
861                    configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
862            });
863            if (destInfo.isRemoveOperation()) {
864                // Serialize with removeSub operations such that all removeSub advisories
865                // are generated
866                serialExecutor.execute(new Runnable() {
867                    @Override
868                    public void run() {
869                        try {
870                            localBroker.oneway(destInfo);
871                        } catch (IOException e) {
872                            LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
873                        }
874                    }
875                });
876            } else {
877                localBroker.oneway(destInfo);
878            }
879        } else if (data.getClass() == RemoveInfo.class) {
880            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
881            removeDemandSubscription(id);
882
883            if (forcedDurableRemoteId.remove(id)) {
884                for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
885                    DemandSubscription ds = i.next();
886                    boolean removed = ds.removeForcedDurableConsumer(id);
887                    if (removed) {
888                        cleanupDurableSub(ds, i);
889                    }
890                }
891           }
892
893        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
894            RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
895            SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
896            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
897                DemandSubscription ds = i.next();
898                boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
899                if (removed) {
900                    cleanupDurableSub(ds, i);
901                }
902            }
903        }
904    }
905
906    private void cleanupDurableSub(final DemandSubscription ds,
907            Iterator<DemandSubscription> i) throws IOException {
908        if (ds != null && ds.getLocalDurableSubscriber() != null && ds.getDurableRemoteSubs().isEmpty()
909                && ds.getForcedDurableConsumersSize() == 0) {
910
911            // deactivate subscriber
912            RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
913            localBroker.oneway(removeInfo);
914
915            // remove subscriber
916            RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
917            sending.setClientId(localClientId);
918            sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
919            sending.setConnectionId(this.localConnectionInfo.getConnectionId());
920            localBroker.oneway(sending);
921
922            //remove subscriber from map
923            i.remove();
924        }
925    }
926
927    @Override
928    public void serviceLocalException(Throwable error) {
929        serviceLocalException(null, error);
930    }
931
932    public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
933        LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
934        if (!disposed.get()) {
935            if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
936                // not a reason to terminate the bridge - temps can disappear with
937                // pending sends as the demand sub may outlive the remote dest
938                if (messageDispatch != null) {
939                    LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
940                    try {
941                        MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
942                        poisonAck.setPoisonCause(error);
943                        localBroker.oneway(poisonAck);
944                    } catch (IOException ioe) {
945                        LOG.error("Failed to posion ack message following forward failure: ", ioe);
946                    }
947                    fireFailedForwardAdvisory(messageDispatch, error);
948                } else {
949                    LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error);
950                }
951                return;
952            }
953
954            LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
955            LOG.debug("The local Exception was: {}", error, error);
956
957            brokerService.getTaskRunnerFactory().execute(new Runnable() {
958                @Override
959                public void run() {
960                    ServiceSupport.dispose(getControllingService());
961                }
962            });
963            fireBridgeFailed(error);
964        }
965    }
966
967    private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
968        if (configuration.isAdvisoryForFailedForward()) {
969            AdvisoryBroker advisoryBroker = null;
970            try {
971                advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
972
973                if (advisoryBroker != null) {
974                    ConnectionContext context = new ConnectionContext();
975                    context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
976                    context.setBroker(brokerService.getBroker());
977
978                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
979                    advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
980                    advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
981                            advisoryMessage);
982
983                }
984            } catch (Exception e) {
985                LOG.warn("failed to fire forward failure advisory, cause: {}", e);
986                LOG.debug("detail", e);
987            }
988        }
989    }
990
991    protected Service getControllingService() {
992        return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
993    }
994
995    protected void addSubscription(DemandSubscription sub) throws IOException {
996        if (sub != null) {
997            if (isDuplex()) {
998                // async vm transport, need to wait for completion
999                localBroker.request(sub.getLocalInfo());
1000            } else {
1001                localBroker.oneway(sub.getLocalInfo());
1002            }
1003        }
1004    }
1005
1006    protected void removeSubscription(final DemandSubscription sub) throws IOException {
1007        if (sub != null) {
1008            LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
1009
1010            // ensure not available for conduit subs pending removal
1011            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1012            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1013
1014            // continue removal in separate thread to free up this thread for outstanding responses
1015            // Serialize with removeDestination operations so that removeSubs are serialized with
1016            // removeDestinations such that all removeSub advisories are generated
1017            serialExecutor.execute(new Runnable() {
1018                @Override
1019                public void run() {
1020                    sub.waitForCompletion();
1021                    try {
1022                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
1023                    } catch (IOException e) {
1024                        LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
1025                    }
1026                }
1027            });
1028        }
1029    }
1030
1031    protected Message configureMessage(MessageDispatch md) throws IOException {
1032        Message message = md.getMessage().copy();
1033        // Update the packet to show where it came from.
1034        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
1035        message.setProducerId(producerInfo.getProducerId());
1036        message.setDestination(md.getDestination());
1037        message.setMemoryUsage(null);
1038        if (message.getOriginalTransactionId() == null) {
1039            message.setOriginalTransactionId(message.getTransactionId());
1040        }
1041        message.setTransactionId(null);
1042        if (configuration.isUseCompression()) {
1043            message.compress();
1044        }
1045        return message;
1046    }
1047
1048    protected void serviceLocalCommand(Command command) {
1049        if (!disposed.get()) {
1050            try {
1051                if (command.isMessageDispatch()) {
1052                    safeWaitUntilStarted();
1053                    networkBridgeStatistics.getEnqueues().increment();
1054                    final MessageDispatch md = (MessageDispatch) command;
1055                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
1056                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
1057
1058                        if (suppressMessageDispatch(md, sub)) {
1059                            LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
1060                                    configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
1061                            });
1062                            // still ack as it may be durable
1063                            try {
1064                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1065                            } finally {
1066                                sub.decrementOutstandingResponses();
1067                            }
1068                            return;
1069                        }
1070
1071                        Message message = configureMessage(md);
1072                        LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
1073                                configuration.getBrokerName(), remoteBrokerName, md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), (LOG.isTraceEnabled() ? message : message.getMessageId())
1074                        });
1075                        if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
1076                            try {
1077                                // never request b/c they are eventually                     acked async
1078                                remoteBroker.oneway(message);
1079                            } finally {
1080                                sub.decrementOutstandingResponses();
1081                            }
1082                            return;
1083                        }
1084                        if (isPermissableDestination(md.getDestination())) {
1085                           if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
1086
1087                              // The message was not sent using async send, so we should only
1088                              // ack the local broker when we get confirmation that the remote
1089                              // broker has received the message.
1090                              remoteBroker.asyncRequest(message, new ResponseCallback() {
1091                                 @Override
1092                                 public void onCompletion(FutureResponse future) {
1093                                    try {
1094                                       Response response = future.getResult();
1095                                       if (response.isException()) {
1096                                          ExceptionResponse er = (ExceptionResponse) response;
1097                                          serviceLocalException(md, er.getException());
1098                                       } else {
1099                                          localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1100                                          networkBridgeStatistics.getDequeues().increment();
1101                                       }
1102                                    } catch (IOException e) {
1103                                       serviceLocalException(md, e);
1104                                    } finally {
1105                                       sub.decrementOutstandingResponses();
1106                                    }
1107                                 }
1108                              });
1109
1110                           } else {
1111                              // If the message was originally sent using async send, we will
1112                              // preserve that QOS by bridging it using an async send (small chance
1113                              // of message loss).
1114                              try {
1115                                 remoteBroker.oneway(message);
1116                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1117                                 networkBridgeStatistics.getDequeues().increment();
1118                              } finally {
1119                                 sub.decrementOutstandingResponses();
1120                              }
1121                           }
1122                           serviceOutbound(message);
1123                        }
1124                    } else {
1125                        LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
1126                    }
1127                } else if (command.isBrokerInfo()) {
1128                    futureLocalBrokerInfo.set((BrokerInfo) command);
1129                } else if (command.isShutdownInfo()) {
1130                    LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
1131                    stop();
1132                } else if (command.getClass() == ConnectionError.class) {
1133                    ConnectionError ce = (ConnectionError) command;
1134                    serviceLocalException(ce.getException());
1135                } else {
1136                    switch (command.getDataStructureType()) {
1137                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
1138                            break;
1139                        case BrokerSubscriptionInfo.DATA_STRUCTURE_TYPE:
1140                            break;
1141                        default:
1142                            LOG.warn("Unexpected local command: {}", command);
1143                    }
1144                }
1145            } catch (Throwable e) {
1146                LOG.warn("Caught an exception processing local command", e);
1147                serviceLocalException(e);
1148            }
1149        }
1150    }
1151
1152    private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
1153        boolean suppress = false;
1154        // for durable subs, suppression via filter leaves dangling acks so we
1155        // need to check here and allow the ack irrespective
1156        if (sub.getLocalInfo().isDurable()) {
1157            MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
1158            messageEvalContext.setMessageReference(md.getMessage());
1159            messageEvalContext.setDestination(md.getDestination());
1160            suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
1161        }
1162        return suppress;
1163    }
1164
1165    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
1166        if (brokerPath != null) {
1167            for (BrokerId id : brokerPath) {
1168                if (brokerId.equals(id)) {
1169                    return true;
1170                }
1171            }
1172        }
1173        return false;
1174    }
1175
1176    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
1177        if (brokerPath == null || brokerPath.length == 0) {
1178            return pathsToAppend;
1179        }
1180        BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
1181        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1182        System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
1183        return rc;
1184    }
1185
1186    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
1187        if (brokerPath == null || brokerPath.length == 0) {
1188            return new BrokerId[]{idToAppend};
1189        }
1190        BrokerId rc[] = new BrokerId[brokerPath.length + 1];
1191        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1192        rc[brokerPath.length] = idToAppend;
1193        return rc;
1194    }
1195
1196    protected boolean isPermissableDestination(ActiveMQDestination destination) {
1197        return isPermissableDestination(destination, false);
1198    }
1199
1200    protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
1201        // Are we not bridging temporary destinations?
1202        if (destination.isTemporary()) {
1203            if (allowTemporary) {
1204                return true;
1205            } else {
1206                return configuration.isBridgeTempDestinations();
1207            }
1208        }
1209
1210        ActiveMQDestination[] dests = excludedDestinations;
1211        if (dests != null && dests.length > 0) {
1212            for (ActiveMQDestination dest : dests) {
1213                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
1214                if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1215                    return false;
1216                }
1217            }
1218        }
1219
1220        dests = staticallyIncludedDestinations;
1221        if (dests != null && dests.length > 0) {
1222            for (ActiveMQDestination dest : dests) {
1223                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1224                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1225                    return true;
1226                }
1227            }
1228        }
1229
1230        dests = dynamicallyIncludedDestinations;
1231        if (dests != null && dests.length > 0) {
1232            for (ActiveMQDestination dest : dests) {
1233                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1234                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1235                    return true;
1236                }
1237            }
1238
1239            return false;
1240        }
1241
1242        return true;
1243    }
1244
1245    private boolean matchesDynamicallyIncludedDestinations(ActiveMQDestination destination) {
1246        ActiveMQDestination[] dests = dynamicallyIncludedDestinations;
1247        if (dests != null && dests.length > 0) {
1248            for (ActiveMQDestination dest : dests) {
1249                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1250                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1251                    return true;
1252                }
1253            }
1254        }
1255
1256        return false;
1257    }
1258
1259    protected ActiveMQDestination findMatchingDestination(ActiveMQDestination[] dests, ActiveMQDestination destination) {
1260        if (dests != null && dests.length > 0) {
1261            for (ActiveMQDestination dest : dests) {
1262                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1263                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1264                    return dest;
1265                }
1266            }
1267        }
1268
1269        return null;
1270    }
1271
1272    /**
1273     * Subscriptions for these destinations are always created
1274     */
1275    protected void setupStaticDestinations() {
1276        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1277        if (dests != null) {
1278            for (ActiveMQDestination dest : dests) {
1279                if (isPermissableDestination(dest)) {
1280                    DemandSubscription sub = createDemandSubscription(dest, null);
1281                    sub.setStaticallyIncluded(true);
1282                    try {
1283                        addSubscription(sub);
1284                    } catch (IOException e) {
1285                        LOG.error("Failed to add static destination {}", dest, e);
1286                    }
1287                    LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
1288                } else {
1289                    LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest);
1290                }
1291            }
1292        }
1293    }
1294
1295    protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1296        ConsumerInfo info = consumerInfo.copy();
1297        addRemoteBrokerToBrokerPath(info);
1298        DemandSubscription sub = createDemandSubscription(info);
1299        if (sub != null) {
1300            if (duplicateSuppressionIsRequired(sub)) {
1301                undoMapRegistration(sub);
1302            } else {
1303                if (consumerInfo.isDurable()) {
1304                    sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
1305                }
1306                addSubscription(sub);
1307                LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub);
1308            }
1309        }
1310    }
1311
1312    private void undoMapRegistration(DemandSubscription sub) {
1313        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1314        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1315    }
1316
1317    /*
1318     * check our existing subs networkConsumerIds against the list of network
1319     * ids in this subscription A match means a duplicate which we suppress for
1320     * topics and maybe for queues
1321     */
1322    private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1323        final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1324        boolean suppress = false;
1325
1326        if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic()
1327                && !configuration.isSuppressDuplicateTopicSubscriptions()) {
1328            return suppress;
1329        }
1330
1331        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1332        Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
1333        for (Subscription sub : currentSubs) {
1334            List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1335            if (!networkConsumers.isEmpty()) {
1336                if (matchFound(candidateConsumers, networkConsumers)) {
1337                    if (isInActiveDurableSub(sub)) {
1338                        suppress = false;
1339                    } else {
1340                        suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1341                    }
1342                    break;
1343                }
1344            }
1345        }
1346        return suppress;
1347    }
1348
1349    private boolean isInActiveDurableSub(Subscription sub) {
1350        return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
1351    }
1352
1353    private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1354        boolean suppress = false;
1355
1356        if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1357            LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{
1358                    configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds()
1359            });
1360            suppress = true;
1361        } else {
1362            // remove the existing lower priority duplicate and allow this candidate
1363            try {
1364                removeDuplicateSubscription(existingSub);
1365
1366                LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{
1367                        configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds()
1368                });
1369            } catch (IOException e) {
1370                LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e);
1371            }
1372        }
1373        return suppress;
1374    }
1375
1376    private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1377        for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1378            if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1379                break;
1380            }
1381        }
1382    }
1383
1384    private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1385        boolean found = false;
1386        for (ConsumerId aliasConsumer : networkConsumers) {
1387            if (candidateConsumers.contains(aliasConsumer)) {
1388                found = true;
1389                break;
1390            }
1391        }
1392        return found;
1393    }
1394
1395    protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1396        RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1397        Region region;
1398        Collection<Subscription> subs;
1399
1400        region = null;
1401        switch (dest.getDestinationType()) {
1402            case ActiveMQDestination.QUEUE_TYPE:
1403                region = region_broker.getQueueRegion();
1404                break;
1405            case ActiveMQDestination.TOPIC_TYPE:
1406                region = region_broker.getTopicRegion();
1407                break;
1408            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1409                region = region_broker.getTempQueueRegion();
1410                break;
1411            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1412                region = region_broker.getTempTopicRegion();
1413                break;
1414        }
1415
1416        if (region instanceof AbstractRegion) {
1417            subs = ((AbstractRegion) region).getSubscriptions().values();
1418        } else {
1419            subs = null;
1420        }
1421
1422        return subs;
1423    }
1424
1425    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1426        // add our original id to ourselves
1427        info.addNetworkConsumerId(info.getConsumerId());
1428        return doCreateDemandSubscription(info);
1429    }
1430
1431    protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1432        DemandSubscription result = new DemandSubscription(info);
1433        result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1434        if (info.getDestination().isTemporary()) {
1435            // reset the local connection Id
1436            ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1437            dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1438        }
1439
1440        if (configuration.isDecreaseNetworkConsumerPriority()) {
1441            byte priority = (byte) configuration.getConsumerPriorityBase();
1442            if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1443                // The longer the path to the consumer, the less it's consumer priority.
1444                priority -= info.getBrokerPath().length + 1;
1445            }
1446            result.getLocalInfo().setPriority(priority);
1447            LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
1448        }
1449        configureDemandSubscription(info, result);
1450        return result;
1451    }
1452
1453    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination, final String subscriptionName) {
1454        ConsumerInfo info = new ConsumerInfo();
1455        info.setNetworkSubscription(true);
1456        info.setDestination(destination);
1457
1458        if (subscriptionName != null) {
1459            info.setSubscriptionName(subscriptionName);
1460        }
1461
1462        // Indicate that this subscription is being made on behalf of the remote broker.
1463        info.setBrokerPath(new BrokerId[]{remoteBrokerId});
1464
1465        // the remote info held by the DemandSubscription holds the original
1466        // consumerId, the local info get's overwritten
1467        info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1468        DemandSubscription result = null;
1469        try {
1470            result = createDemandSubscription(info);
1471        } catch (IOException e) {
1472            LOG.error("Failed to create DemandSubscription ", e);
1473        }
1474        return result;
1475    }
1476
1477    protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1478        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ||
1479                AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
1480            sub.getLocalInfo().setDispatchAsync(true);
1481        } else {
1482            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1483        }
1484        configureConsumerPrefetch(sub.getLocalInfo());
1485        subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1486        subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1487
1488        sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1489        if (!info.isDurable()) {
1490            // This works for now since we use a VM connection to the local broker.
1491            // may need to change if we ever subscribe to a remote broker.
1492            sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1493        } else {
1494            sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
1495        }
1496    }
1497
1498    protected void removeDemandSubscription(ConsumerId id) throws IOException {
1499        DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1500        LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{
1501                configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub
1502        });
1503        if (sub != null) {
1504            removeSubscription(sub);
1505            LOG.debug("{} removed sub on {} from {}: {}", new Object[]{
1506                    configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo()
1507            });
1508        }
1509    }
1510
1511    protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1512        boolean removeDone = false;
1513        DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1514        if (sub != null) {
1515            try {
1516                removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1517                removeDone = true;
1518            } catch (IOException e) {
1519                LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e);
1520            }
1521        }
1522        return removeDone;
1523    }
1524
1525    /**
1526     * Performs a timed wait on the started latch and then checks for disposed
1527     * before performing another wait each time the the started wait times out.
1528     */
1529    protected boolean safeWaitUntilStarted() throws InterruptedException {
1530        while (!disposed.get()) {
1531            if (startedLatch.await(1, TimeUnit.SECONDS)) {
1532                break;
1533            }
1534        }
1535        return !disposed.get();
1536    }
1537
1538    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1539        NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1540        if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1541            PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1542            if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1543                filterFactory = entry.getNetworkBridgeFilterFactory();
1544            }
1545        }
1546        return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL());
1547    }
1548
1549    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1550        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1551    }
1552
1553    protected BrokerId[] getRemoteBrokerPath() {
1554        return remoteBrokerPath;
1555    }
1556
1557    @Override
1558    public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1559        this.networkBridgeListener = listener;
1560    }
1561
1562    private void fireBridgeFailed(Throwable reason) {
1563        LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason);
1564        NetworkBridgeListener l = this.networkBridgeListener;
1565        if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
1566            l.bridgeFailed();
1567        }
1568    }
1569
1570    /**
1571     * @return Returns the dynamicallyIncludedDestinations.
1572     */
1573    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
1574        return dynamicallyIncludedDestinations;
1575    }
1576
1577    /**
1578     * @param dynamicallyIncludedDestinations
1579     *         The dynamicallyIncludedDestinations to set.
1580     */
1581    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
1582        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
1583    }
1584
1585    /**
1586     * @return Returns the excludedDestinations.
1587     */
1588    public ActiveMQDestination[] getExcludedDestinations() {
1589        return excludedDestinations;
1590    }
1591
1592    /**
1593     * @param excludedDestinations The excludedDestinations to set.
1594     */
1595    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
1596        this.excludedDestinations = excludedDestinations;
1597    }
1598
1599    /**
1600     * @return Returns the staticallyIncludedDestinations.
1601     */
1602    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
1603        return staticallyIncludedDestinations;
1604    }
1605
1606    /**
1607     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
1608     */
1609    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
1610        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
1611    }
1612
1613    /**
1614     * @return Returns the durableDestinations.
1615     */
1616    public ActiveMQDestination[] getDurableDestinations() {
1617        return durableDestinations;
1618    }
1619
1620    /**
1621     * @param durableDestinations The durableDestinations to set.
1622     */
1623    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
1624        this.durableDestinations = durableDestinations;
1625    }
1626
1627    /**
1628     * @return Returns the localBroker.
1629     */
1630    public Transport getLocalBroker() {
1631        return localBroker;
1632    }
1633
1634    /**
1635     * @return Returns the remoteBroker.
1636     */
1637    public Transport getRemoteBroker() {
1638        return remoteBroker;
1639    }
1640
1641    /**
1642     * @return the createdByDuplex
1643     */
1644    public boolean isCreatedByDuplex() {
1645        return this.createdByDuplex;
1646    }
1647
1648    /**
1649     * @param createdByDuplex the createdByDuplex to set
1650     */
1651    public void setCreatedByDuplex(boolean createdByDuplex) {
1652        this.createdByDuplex = createdByDuplex;
1653    }
1654
1655    @Override
1656    public String getRemoteAddress() {
1657        return remoteBroker.getRemoteAddress();
1658    }
1659
1660    @Override
1661    public String getLocalAddress() {
1662        return localBroker.getRemoteAddress();
1663    }
1664
1665    @Override
1666    public String getRemoteBrokerName() {
1667        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1668    }
1669
1670    @Override
1671    public String getRemoteBrokerId() {
1672        return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
1673    }
1674
1675    @Override
1676    public String getLocalBrokerName() {
1677        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1678    }
1679
1680    @Override
1681    public long getDequeueCounter() {
1682        return networkBridgeStatistics.getDequeues().getCount();
1683    }
1684
1685    @Override
1686    public long getEnqueueCounter() {
1687        return networkBridgeStatistics.getEnqueues().getCount();
1688    }
1689
1690    @Override
1691    public NetworkBridgeStatistics getNetworkBridgeStatistics() {
1692        return networkBridgeStatistics;
1693    }
1694
1695    protected boolean isDuplex() {
1696        return configuration.isDuplex() || createdByDuplex;
1697    }
1698
1699    public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1700        return subscriptionMapByRemoteId;
1701    }
1702
1703    @Override
1704    public void setBrokerService(BrokerService brokerService) {
1705        this.brokerService = brokerService;
1706        this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1707        localBrokerPath[0] = localBrokerId;
1708    }
1709
1710    @Override
1711    public void setMbeanObjectName(ObjectName objectName) {
1712        this.mbeanObjectName = objectName;
1713    }
1714
1715    @Override
1716    public ObjectName getMbeanObjectName() {
1717        return mbeanObjectName;
1718    }
1719
1720    @Override
1721    public void resetStats() {
1722        networkBridgeStatistics.reset();
1723    }
1724
1725    /*
1726     * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
1727     * remote sides of the network bridge.
1728     */
1729    private static class FutureBrokerInfo implements Future<BrokerInfo> {
1730
1731        private final CountDownLatch slot = new CountDownLatch(1);
1732        private final AtomicBoolean disposed;
1733        private volatile BrokerInfo info = null;
1734
1735        public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
1736            this.info = info;
1737            this.disposed = disposed;
1738        }
1739
1740        @Override
1741        public boolean cancel(boolean mayInterruptIfRunning) {
1742            slot.countDown();
1743            return true;
1744        }
1745
1746        @Override
1747        public boolean isCancelled() {
1748            return slot.getCount() == 0 && info == null;
1749        }
1750
1751        @Override
1752        public boolean isDone() {
1753            return info != null;
1754        }
1755
1756        @Override
1757        public BrokerInfo get() throws InterruptedException, ExecutionException {
1758            try {
1759                if (info == null) {
1760                    while (!disposed.get()) {
1761                        if (slot.await(1, TimeUnit.SECONDS)) {
1762                            break;
1763                        }
1764                    }
1765                }
1766                return info;
1767            } catch (InterruptedException e) {
1768                Thread.currentThread().interrupt();
1769                LOG.debug("Operation interrupted: {}", e, e);
1770                throw new InterruptedException("Interrupted.");
1771            }
1772        }
1773
1774        @Override
1775        public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1776            try {
1777                if (info == null) {
1778                    long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
1779
1780                    while (!disposed.get() || System.currentTimeMillis() < deadline) {
1781                        if (slot.await(1, TimeUnit.MILLISECONDS)) {
1782                            break;
1783                        }
1784                    }
1785                    if (info == null) {
1786                        throw new TimeoutException();
1787                    }
1788                }
1789                return info;
1790            } catch (InterruptedException e) {
1791                throw new InterruptedException("Interrupted.");
1792            }
1793        }
1794
1795        public void set(BrokerInfo info) {
1796            this.info = info;
1797            this.slot.countDown();
1798        }
1799    }
1800
1801    protected void serviceOutbound(Message message) {
1802        NetworkBridgeListener l = this.networkBridgeListener;
1803        if (l != null) {
1804            l.onOutboundMessage(this, message);
1805        }
1806    }
1807
1808    protected void serviceInboundMessage(Message message) {
1809        NetworkBridgeListener l = this.networkBridgeListener;
1810        if (l != null) {
1811            l.onInboundMessage(this, message);
1812        }
1813    }
1814
1815    protected boolean canDuplexDispatch(Message message) {
1816        boolean result = true;
1817        if (configuration.isCheckDuplicateMessagesOnDuplex()){
1818            final long producerSequenceId = message.getMessageId().getProducerSequenceId();
1819            //  messages are multiplexed on this producer so we need to query the persistenceAdapter
1820            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
1821            if (producerSequenceId <= lastStoredForMessageProducer) {
1822                result = false;
1823                LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
1824                        (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
1825                });
1826            }
1827        }
1828        return result;
1829    }
1830
1831    protected long getStoredSequenceIdForMessage(MessageId messageId) {
1832        try {
1833            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
1834        } catch (IOException ignored) {
1835            LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
1836        }
1837        return -1;
1838    }
1839
1840    protected void configureConsumerPrefetch(ConsumerInfo consumerInfo) {
1841        //If a consumer on an advisory topic and advisoryPrefetchSize has been explicitly
1842        //set then use it, else default to the prefetchSize setting
1843        if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination()) &&
1844                configuration.getAdvisoryPrefetchSize() > 0) {
1845            consumerInfo.setPrefetchSize(configuration.getAdvisoryPrefetchSize());
1846        } else {
1847            consumerInfo.setPrefetchSize(configuration.getPrefetchSize());
1848        }
1849    }
1850
1851}