001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.security.GeneralSecurityException;
021import java.security.cert.X509Certificate;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Properties;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.Executors;
033import java.util.concurrent.Future;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036import java.util.concurrent.atomic.AtomicBoolean;
037
038import javax.management.ObjectName;
039
040import org.apache.activemq.DestinationDoesNotExistException;
041import org.apache.activemq.Service;
042import org.apache.activemq.advisory.AdvisoryBroker;
043import org.apache.activemq.advisory.AdvisorySupport;
044import org.apache.activemq.broker.BrokerService;
045import org.apache.activemq.broker.BrokerServiceAware;
046import org.apache.activemq.broker.ConnectionContext;
047import org.apache.activemq.broker.TransportConnection;
048import org.apache.activemq.broker.region.AbstractRegion;
049import org.apache.activemq.broker.region.DurableTopicSubscription;
050import org.apache.activemq.broker.region.Region;
051import org.apache.activemq.broker.region.RegionBroker;
052import org.apache.activemq.broker.region.Subscription;
053import org.apache.activemq.broker.region.policy.PolicyEntry;
054import org.apache.activemq.command.ActiveMQDestination;
055import org.apache.activemq.command.ActiveMQMessage;
056import org.apache.activemq.command.ActiveMQTempDestination;
057import org.apache.activemq.command.ActiveMQTopic;
058import org.apache.activemq.command.BrokerId;
059import org.apache.activemq.command.BrokerInfo;
060import org.apache.activemq.command.Command;
061import org.apache.activemq.command.ConnectionError;
062import org.apache.activemq.command.ConnectionId;
063import org.apache.activemq.command.ConnectionInfo;
064import org.apache.activemq.command.ConsumerId;
065import org.apache.activemq.command.ConsumerInfo;
066import org.apache.activemq.command.DataStructure;
067import org.apache.activemq.command.DestinationInfo;
068import org.apache.activemq.command.ExceptionResponse;
069import org.apache.activemq.command.KeepAliveInfo;
070import org.apache.activemq.command.Message;
071import org.apache.activemq.command.MessageAck;
072import org.apache.activemq.command.MessageDispatch;
073import org.apache.activemq.command.MessageId;
074import org.apache.activemq.command.NetworkBridgeFilter;
075import org.apache.activemq.command.ProducerInfo;
076import org.apache.activemq.command.RemoveInfo;
077import org.apache.activemq.command.RemoveSubscriptionInfo;
078import org.apache.activemq.command.Response;
079import org.apache.activemq.command.SessionInfo;
080import org.apache.activemq.command.ShutdownInfo;
081import org.apache.activemq.command.SubscriptionInfo;
082import org.apache.activemq.command.WireFormatInfo;
083import org.apache.activemq.filter.DestinationFilter;
084import org.apache.activemq.filter.MessageEvaluationContext;
085import org.apache.activemq.security.SecurityContext;
086import org.apache.activemq.transport.DefaultTransportListener;
087import org.apache.activemq.transport.FutureResponse;
088import org.apache.activemq.transport.ResponseCallback;
089import org.apache.activemq.transport.Transport;
090import org.apache.activemq.transport.TransportDisposedIOException;
091import org.apache.activemq.transport.TransportFilter;
092import org.apache.activemq.transport.tcp.SslTransport;
093import org.apache.activemq.util.IdGenerator;
094import org.apache.activemq.util.IntrospectionSupport;
095import org.apache.activemq.util.LongSequenceGenerator;
096import org.apache.activemq.util.MarshallingSupport;
097import org.apache.activemq.util.ServiceStopper;
098import org.apache.activemq.util.ServiceSupport;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102/**
103 * A useful base class for implementing demand forwarding bridges.
104 */
105public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
106    private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
107    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
108    protected final Transport localBroker;
109    protected final Transport remoteBroker;
110    protected IdGenerator idGenerator = new IdGenerator();
111    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
112    protected ConnectionInfo localConnectionInfo;
113    protected ConnectionInfo remoteConnectionInfo;
114    protected SessionInfo localSessionInfo;
115    protected ProducerInfo producerInfo;
116    protected String remoteBrokerName = "Unknown";
117    protected String localClientId;
118    protected ConsumerInfo demandConsumerInfo;
119    protected int demandConsumerDispatched;
120    protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
121    protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
122    protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
123    protected final AtomicBoolean disposed = new AtomicBoolean();
124    protected BrokerId localBrokerId;
125    protected ActiveMQDestination[] excludedDestinations;
126    protected ActiveMQDestination[] dynamicallyIncludedDestinations;
127    protected ActiveMQDestination[] staticallyIncludedDestinations;
128    protected ActiveMQDestination[] durableDestinations;
129    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
130    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
131    protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
132    protected final CountDownLatch startedLatch = new CountDownLatch(2);
133    protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
134    protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
135    protected NetworkBridgeConfiguration configuration;
136    protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
137
138    protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
139    protected BrokerId remoteBrokerId;
140
141    protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics();
142
143    private NetworkBridgeListener networkBridgeListener;
144    private boolean createdByDuplex;
145    private BrokerInfo localBrokerInfo;
146    private BrokerInfo remoteBrokerInfo;
147
148    private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
149    private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
150
151    private final AtomicBoolean started = new AtomicBoolean();
152    private TransportConnection duplexInitiatingConnection;
153    private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean();
154    protected BrokerService brokerService = null;
155    private ObjectName mbeanObjectName;
156    private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
157    private Transport duplexInboundLocalBroker = null;
158    private ProducerInfo duplexInboundLocalProducerInfo;
159
160    public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
161        this.configuration = configuration;
162        this.localBroker = localBroker;
163        this.remoteBroker = remoteBroker;
164    }
165
166    public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
167        this.localBrokerInfo = localBrokerInfo;
168        this.remoteBrokerInfo = remoteBrokerInfo;
169        this.duplexInitiatingConnection = connection;
170        start();
171        serviceRemoteCommand(remoteBrokerInfo);
172    }
173
174    @Override
175    public void start() throws Exception {
176        if (started.compareAndSet(false, true)) {
177
178            if (brokerService == null) {
179                throw new IllegalArgumentException("BrokerService is null on " + this);
180            }
181
182            networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
183
184            if (isDuplex()) {
185                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
186                duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
187
188                    @Override
189                    public void onCommand(Object o) {
190                        Command command = (Command) o;
191                        serviceLocalCommand(command);
192                    }
193
194                    @Override
195                    public void onException(IOException error) {
196                        serviceLocalException(error);
197                    }
198                });
199                duplexInboundLocalBroker.start();
200            }
201
202            localBroker.setTransportListener(new DefaultTransportListener() {
203
204                @Override
205                public void onCommand(Object o) {
206                    Command command = (Command) o;
207                    serviceLocalCommand(command);
208                }
209
210                @Override
211                public void onException(IOException error) {
212                    if (!futureLocalBrokerInfo.isDone()) {
213                        futureLocalBrokerInfo.cancel(true);
214                        return;
215                    }
216                    serviceLocalException(error);
217                }
218            });
219
220            remoteBroker.setTransportListener(new DefaultTransportListener() {
221
222                @Override
223                public void onCommand(Object o) {
224                    Command command = (Command) o;
225                    serviceRemoteCommand(command);
226                }
227
228                @Override
229                public void onException(IOException error) {
230                    if (!futureRemoteBrokerInfo.isDone()) {
231                        futureRemoteBrokerInfo.cancel(true);
232                        return;
233                    }
234                    serviceRemoteException(error);
235                }
236            });
237
238            remoteBroker.start();
239            localBroker.start();
240
241            if (!disposed.get()) {
242                try {
243                    triggerStartAsyncNetworkBridgeCreation();
244                } catch (IOException e) {
245                    LOG.warn("Caught exception from remote start", e);
246                }
247            } else {
248                LOG.warn("Bridge was disposed before the start() method was fully executed.");
249                throw new TransportDisposedIOException();
250            }
251        }
252    }
253
254    @Override
255    public void stop() throws Exception {
256        if (started.compareAndSet(true, false)) {
257            if (disposed.compareAndSet(false, true)) {
258                LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName);
259
260                futureRemoteBrokerInfo.cancel(true);
261                futureLocalBrokerInfo.cancel(true);
262
263                NetworkBridgeListener l = this.networkBridgeListener;
264                if (l != null) {
265                    l.onStop(this);
266                }
267                try {
268                    // local start complete
269                    if (startedLatch.getCount() < 2) {
270                        LOG.trace("{} unregister bridge ({}) to {}", new Object[]{
271                                configuration.getBrokerName(), this, remoteBrokerName
272                        });
273                        brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
274                        brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
275                    }
276
277                    remoteBridgeStarted.set(false);
278                    final CountDownLatch sendShutdown = new CountDownLatch(1);
279
280                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
281                        @Override
282                        public void run() {
283                            try {
284                                serialExecutor.shutdown();
285                                if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
286                                    List<Runnable> pendingTasks = serialExecutor.shutdownNow();
287                                    LOG.info("pending tasks on stop {}", pendingTasks);
288                                }
289                                localBroker.oneway(new ShutdownInfo());
290                                remoteBroker.oneway(new ShutdownInfo());
291                            } catch (Throwable e) {
292                                LOG.debug("Caught exception sending shutdown", e);
293                            } finally {
294                                sendShutdown.countDown();
295                            }
296
297                        }
298                    }, "ActiveMQ ForwardingBridge StopTask");
299
300                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
301                        LOG.info("Network Could not shutdown in a timely manner");
302                    }
303                } finally {
304                    ServiceStopper ss = new ServiceStopper();
305                    ss.stop(remoteBroker);
306                    ss.stop(localBroker);
307                    ss.stop(duplexInboundLocalBroker);
308                    // Release the started Latch since another thread could be
309                    // stuck waiting for it to start up.
310                    startedLatch.countDown();
311                    startedLatch.countDown();
312                    localStartedLatch.countDown();
313
314                    ss.throwFirstException();
315                }
316            }
317
318            LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName);
319        }
320    }
321
322    protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
323        brokerService.getTaskRunnerFactory().execute(new Runnable() {
324            @Override
325            public void run() {
326                final String originalName = Thread.currentThread().getName();
327                Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
328                        "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
329
330                try {
331                    // First we collect the info data from both the local and remote ends
332                    collectBrokerInfos();
333
334                    // Once we have all required broker info we can attempt to start
335                    // the local and then remote sides of the bridge.
336                    doStartLocalAndRemoteBridges();
337                } finally {
338                    Thread.currentThread().setName(originalName);
339                }
340            }
341        });
342    }
343
344    private void collectBrokerInfos() {
345
346        // First wait for the remote to feed us its BrokerInfo, then we can check on
347        // the LocalBrokerInfo and decide is this is a loop.
348        try {
349            remoteBrokerInfo = futureRemoteBrokerInfo.get();
350            if (remoteBrokerInfo == null) {
351                serviceLocalException(new Throwable("remoteBrokerInfo is null"));
352                return;
353            }
354        } catch (Exception e) {
355            serviceRemoteException(e);
356            return;
357        }
358
359        try {
360            localBrokerInfo = futureLocalBrokerInfo.get();
361            if (localBrokerInfo == null) {
362                serviceLocalException(new Throwable("localBrokerInfo is null"));
363                return;
364            }
365
366            // Before we try and build the bridge lets check if we are in a loop
367            // and if so just stop now before registering anything.
368            remoteBrokerId = remoteBrokerInfo.getBrokerId();
369            if (localBrokerId.equals(remoteBrokerId)) {
370                LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
371                        configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
372                });
373                ServiceSupport.dispose(localBroker);
374                ServiceSupport.dispose(remoteBroker);
375                // the bridge is left in a bit of limbo, but it won't get retried
376                // in this state.
377                return;
378            }
379
380            // Fill in the remote broker's information now.
381            remoteBrokerPath[0] = remoteBrokerId;
382            remoteBrokerName = remoteBrokerInfo.getBrokerName();
383            if (configuration.isUseBrokerNamesAsIdSeed()) {
384                idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName);
385            }
386        } catch (Throwable e) {
387            serviceLocalException(e);
388        }
389    }
390
391    private void doStartLocalAndRemoteBridges() {
392
393        if (disposed.get()) {
394            return;
395        }
396
397        if (isCreatedByDuplex()) {
398            // apply remote (propagated) configuration to local duplex bridge before start
399            Properties props = null;
400            try {
401                props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
402                IntrospectionSupport.getProperties(configuration, props, null);
403                if (configuration.getExcludedDestinations() != null) {
404                    excludedDestinations = configuration.getExcludedDestinations().toArray(
405                            new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
406                }
407                if (configuration.getStaticallyIncludedDestinations() != null) {
408                    staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
409                            new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
410                }
411                if (configuration.getDynamicallyIncludedDestinations() != null) {
412                    dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
413                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
414                }
415            } catch (Throwable t) {
416                LOG.error("Error mapping remote configuration: {}", props, t);
417            }
418        }
419
420        try {
421            startLocalBridge();
422        } catch (Throwable e) {
423            serviceLocalException(e);
424            return;
425        }
426
427        try {
428            startRemoteBridge();
429        } catch (Throwable e) {
430            serviceRemoteException(e);
431            return;
432        }
433
434        try {
435            if (safeWaitUntilStarted()) {
436                setupStaticDestinations();
437            }
438        } catch (Throwable e) {
439            serviceLocalException(e);
440        }
441    }
442
443    private void startLocalBridge() throws Throwable {
444        if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) {
445            synchronized (this) {
446                LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker);
447                if (!disposed.get()) {
448
449                    if (idGenerator == null) {
450                        throw new IllegalStateException("Id Generator cannot be null");
451                    }
452
453                    localConnectionInfo = new ConnectionInfo();
454                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
455                    localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
456                    localConnectionInfo.setClientId(localClientId);
457                    localConnectionInfo.setUserName(configuration.getUserName());
458                    localConnectionInfo.setPassword(configuration.getPassword());
459                    Transport originalTransport = remoteBroker;
460                    while (originalTransport instanceof TransportFilter) {
461                        originalTransport = ((TransportFilter) originalTransport).getNext();
462                    }
463                    if (originalTransport instanceof SslTransport) {
464                        X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
465                        localConnectionInfo.setTransportContext(peerCerts);
466                    }
467                    // sync requests that may fail
468                    Object resp = localBroker.request(localConnectionInfo);
469                    if (resp instanceof ExceptionResponse) {
470                        throw ((ExceptionResponse) resp).getException();
471                    }
472                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
473                    localBroker.oneway(localSessionInfo);
474
475                    if (configuration.isDuplex()) {
476                        // separate in-bound channel for forwards so we don't
477                        // contend with out-bound dispatch on same connection
478                        remoteBrokerInfo.setNetworkConnection(true);
479                        duplexInboundLocalBroker.oneway(remoteBrokerInfo);
480
481                        ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
482                        duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
483                        duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
484                                + configuration.getBrokerName());
485                        duplexLocalConnectionInfo.setUserName(configuration.getUserName());
486                        duplexLocalConnectionInfo.setPassword(configuration.getPassword());
487
488                        if (originalTransport instanceof SslTransport) {
489                            X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
490                            duplexLocalConnectionInfo.setTransportContext(peerCerts);
491                        }
492                        // sync requests that may fail
493                        resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
494                        if (resp instanceof ExceptionResponse) {
495                            throw ((ExceptionResponse) resp).getException();
496                        }
497                        SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
498                        duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
499                        duplexInboundLocalBroker.oneway(duplexInboundSession);
500                        duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
501                    }
502                    brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
503                    NetworkBridgeListener l = this.networkBridgeListener;
504                    if (l != null) {
505                        l.onStart(this);
506                    }
507
508                    // Let the local broker know the remote broker's ID.
509                    localBroker.oneway(remoteBrokerInfo);
510                    // new peer broker (a consumer can work with remote broker also)
511                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
512
513                    LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{
514                            localBroker, remoteBroker, remoteBrokerName
515                    });
516                    LOG.trace("{} register bridge ({}) to {}", new Object[]{
517                            configuration.getBrokerName(), this, remoteBrokerName
518                    });
519                } else {
520                    LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
521                }
522                startedLatch.countDown();
523                localStartedLatch.countDown();
524            }
525        }
526    }
527
528    protected void startRemoteBridge() throws Exception {
529        if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) {
530            LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker);
531            synchronized (this) {
532                if (!isCreatedByDuplex()) {
533                    BrokerInfo brokerInfo = new BrokerInfo();
534                    brokerInfo.setBrokerName(configuration.getBrokerName());
535                    brokerInfo.setBrokerURL(configuration.getBrokerURL());
536                    brokerInfo.setNetworkConnection(true);
537                    brokerInfo.setDuplexConnection(configuration.isDuplex());
538                    // set our properties
539                    Properties props = new Properties();
540                    IntrospectionSupport.getProperties(configuration, props, null);
541                    props.remove("networkTTL");
542                    String str = MarshallingSupport.propertiesToString(props);
543                    brokerInfo.setNetworkProperties(str);
544                    brokerInfo.setBrokerId(this.localBrokerId);
545                    remoteBroker.oneway(brokerInfo);
546                }
547                if (remoteConnectionInfo != null) {
548                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
549                }
550                remoteConnectionInfo = new ConnectionInfo();
551                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
552                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
553                remoteConnectionInfo.setUserName(configuration.getUserName());
554                remoteConnectionInfo.setPassword(configuration.getPassword());
555                remoteBroker.oneway(remoteConnectionInfo);
556
557                SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
558                remoteBroker.oneway(remoteSessionInfo);
559                producerInfo = new ProducerInfo(remoteSessionInfo, 1);
560                producerInfo.setResponseRequired(false);
561                remoteBroker.oneway(producerInfo);
562                // Listen to consumer advisory messages on the remote broker to determine demand.
563                if (!configuration.isStaticBridge()) {
564                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
565                    // always dispatch advisory message asynchronously so that
566                    // we never block the producer broker if we are slow
567                    demandConsumerInfo.setDispatchAsync(true);
568                    String advisoryTopic = configuration.getDestinationFilter();
569                    if (configuration.isBridgeTempDestinations()) {
570                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
571                    }
572                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
573                    configureConsumerPrefetch(demandConsumerInfo);
574                    remoteBroker.oneway(demandConsumerInfo);
575                }
576                startedLatch.countDown();
577            }
578        }
579    }
580
581    @Override
582    public void serviceRemoteException(Throwable error) {
583        if (!disposed.get()) {
584            if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
585                LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
586                        localBroker, remoteBroker, error
587                });
588            } else {
589                LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
590                        localBroker, remoteBroker, error
591                });
592            }
593            LOG.debug("The remote Exception was: {}", error, error);
594            brokerService.getTaskRunnerFactory().execute(new Runnable() {
595                @Override
596                public void run() {
597                    ServiceSupport.dispose(getControllingService());
598                }
599            });
600            fireBridgeFailed(error);
601        }
602    }
603
604    protected void serviceRemoteCommand(Command command) {
605        if (!disposed.get()) {
606            try {
607                if (command.isMessageDispatch()) {
608                    safeWaitUntilStarted();
609                    MessageDispatch md = (MessageDispatch) command;
610                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
611                    ackAdvisory(md.getMessage());
612                } else if (command.isBrokerInfo()) {
613                    futureRemoteBrokerInfo.set((BrokerInfo) command);
614                } else if (command.getClass() == ConnectionError.class) {
615                    ConnectionError ce = (ConnectionError) command;
616                    serviceRemoteException(ce.getException());
617                } else {
618                    if (isDuplex()) {
619                        LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
620                        if (command.isMessage()) {
621                            final ActiveMQMessage message = (ActiveMQMessage) command;
622                            if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
623                                serviceRemoteConsumerAdvisory(message.getDataStructure());
624                                ackAdvisory(message);
625                            } else {
626                                if (!isPermissableDestination(message.getDestination(), true)) {
627                                    return;
628                                }
629                                // message being forwarded - we need to
630                                // propagate the response to our local send
631                                if (canDuplexDispatch(message)) {
632                                    message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
633                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
634                                        duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
635                                            final int correlationId = message.getCommandId();
636
637                                            @Override
638                                            public void onCompletion(FutureResponse resp) {
639                                                try {
640                                                    Response reply = resp.getResult();
641                                                    reply.setCorrelationId(correlationId);
642                                                    remoteBroker.oneway(reply);
643                                                    //increment counter when messages are received in duplex mode
644                                                    networkBridgeStatistics.getReceivedCount().increment();
645                                                } catch (IOException error) {
646                                                    LOG.error("Exception: {} on duplex forward of: {}", error, message);
647                                                    serviceRemoteException(error);
648                                                }
649                                            }
650                                        });
651                                    } else {
652                                        duplexInboundLocalBroker.oneway(message);
653                                        networkBridgeStatistics.getReceivedCount().increment();
654                                    }
655                                    serviceInboundMessage(message);
656                                } else {
657                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
658                                        Response reply = new Response();
659                                        reply.setCorrelationId(message.getCommandId());
660                                        remoteBroker.oneway(reply);
661                                    }
662                                }
663                            }
664                        } else {
665                            switch (command.getDataStructureType()) {
666                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
667                                    if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
668                                        // end of initiating connection setup - propogate to initial connection to get mbean by clientid
669                                        duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
670                                    } else {
671                                        localBroker.oneway(command);
672                                    }
673                                    break;
674                                case SessionInfo.DATA_STRUCTURE_TYPE:
675                                    localBroker.oneway(command);
676                                    break;
677                                case ProducerInfo.DATA_STRUCTURE_TYPE:
678                                    // using duplexInboundLocalProducerInfo
679                                    break;
680                                case MessageAck.DATA_STRUCTURE_TYPE:
681                                    MessageAck ack = (MessageAck) command;
682                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
683                                    if (localSub != null) {
684                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
685                                        localBroker.oneway(ack);
686                                    } else {
687                                        LOG.warn("Matching local subscription not found for ack: {}", ack);
688                                    }
689                                    break;
690                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
691                                    localStartedLatch.await();
692                                    if (started.get()) {
693                                        addConsumerInfo((ConsumerInfo) command);
694                                    } else {
695                                        // received a subscription whilst stopping
696                                        LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
697                                    }
698                                    break;
699                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
700                                    // initiator is shutting down, controlled case
701                                    // abortive close dealt with by inactivity monitor
702                                    LOG.info("Stopping network bridge on shutdown of remote broker");
703                                    serviceRemoteException(new IOException(command.toString()));
704                                    break;
705                                default:
706                                    LOG.debug("Ignoring remote command: {}", command);
707                            }
708                        }
709                    } else {
710                        switch (command.getDataStructureType()) {
711                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
712                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
713                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
714                                break;
715                            default:
716                                LOG.warn("Unexpected remote command: {}", command);
717                        }
718                    }
719                }
720            } catch (Throwable e) {
721                LOG.debug("Exception processing remote command: {}", command, e);
722                serviceRemoteException(e);
723            }
724        }
725    }
726
727    private void ackAdvisory(Message message) throws IOException {
728        demandConsumerDispatched++;
729        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() *
730                (configuration.getAdvisoryAckPercentage() / 100f))) {
731            MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
732            ack.setConsumerId(demandConsumerInfo.getConsumerId());
733            remoteBroker.oneway(ack);
734            demandConsumerDispatched = 0;
735        }
736    }
737
738    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
739        final int networkTTL = configuration.getConsumerTTL();
740        if (data.getClass() == ConsumerInfo.class) {
741            // Create a new local subscription
742            ConsumerInfo info = (ConsumerInfo) data;
743            BrokerId[] path = info.getBrokerPath();
744
745            if (info.isBrowser()) {
746                LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
747                return;
748            }
749
750            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
751                LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
752                        configuration.getBrokerName(), remoteBrokerName, networkTTL, info
753                });
754                return;
755            }
756
757            if (contains(path, localBrokerPath[0])) {
758                // Ignore this consumer as it's a consumer we locally sent to the broker.
759                LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
760                        configuration.getBrokerName(), remoteBrokerName, info
761                });
762                return;
763            }
764
765            if (!isPermissableDestination(info.getDestination())) {
766                // ignore if not in the permitted or in the excluded list
767                LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
768                        configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
769                });
770                return;
771            }
772
773            // in a cyclic network there can be multiple bridges per broker that can propagate
774            // a network subscription so there is a need to synchronize on a shared entity
775            synchronized (brokerService.getVmConnectorURI()) {
776                addConsumerInfo(info);
777            }
778        } else if (data.getClass() == DestinationInfo.class) {
779            // It's a destination info - we want to pass up information about temporary destinations
780            final DestinationInfo destInfo = (DestinationInfo) data;
781            BrokerId[] path = destInfo.getBrokerPath();
782            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
783                LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
784                        configuration.getBrokerName(), destInfo, networkTTL
785                });
786                return;
787            }
788            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
789                LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
790                return;
791            }
792            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
793            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
794                // re-set connection id so comes from here
795                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
796                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
797            }
798            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
799            LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
800                    configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
801            });
802            if (destInfo.isRemoveOperation()) {
803                // Serialize with removeSub operations such that all removeSub advisories
804                // are generated
805                serialExecutor.execute(new Runnable() {
806                    @Override
807                    public void run() {
808                        try {
809                            localBroker.oneway(destInfo);
810                        } catch (IOException e) {
811                            LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
812                        }
813                    }
814                });
815            } else {
816                localBroker.oneway(destInfo);
817            }
818        } else if (data.getClass() == RemoveInfo.class) {
819            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
820            removeDemandSubscription(id);
821        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
822            RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
823            SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
824            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
825                DemandSubscription ds = i.next();
826                boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
827                if (removed) {
828                    if (ds.getDurableRemoteSubs().isEmpty()) {
829
830                        // deactivate subscriber
831                        RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
832                        localBroker.oneway(removeInfo);
833
834                        // remove subscriber
835                        RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
836                        sending.setClientId(localClientId);
837                        sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
838                        sending.setConnectionId(this.localConnectionInfo.getConnectionId());
839                        localBroker.oneway(sending);
840
841                        //remove subscriber from map
842                        i.remove();
843                    }
844                }
845            }
846        }
847    }
848
849    @Override
850    public void serviceLocalException(Throwable error) {
851        serviceLocalException(null, error);
852    }
853
854    public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
855        LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
856        if (!disposed.get()) {
857            if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
858                // not a reason to terminate the bridge - temps can disappear with
859                // pending sends as the demand sub may outlive the remote dest
860                if (messageDispatch != null) {
861                    LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
862                    try {
863                        MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
864                        poisonAck.setPoisonCause(error);
865                        localBroker.oneway(poisonAck);
866                    } catch (IOException ioe) {
867                        LOG.error("Failed to posion ack message following forward failure: ", ioe);
868                    }
869                    fireFailedForwardAdvisory(messageDispatch, error);
870                } else {
871                    LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error);
872                }
873                return;
874            }
875
876            LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
877            LOG.debug("The local Exception was: {}", error, error);
878
879            brokerService.getTaskRunnerFactory().execute(new Runnable() {
880                @Override
881                public void run() {
882                    ServiceSupport.dispose(getControllingService());
883                }
884            });
885            fireBridgeFailed(error);
886        }
887    }
888
889    private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
890        if (configuration.isAdvisoryForFailedForward()) {
891            AdvisoryBroker advisoryBroker = null;
892            try {
893                advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
894
895                if (advisoryBroker != null) {
896                    ConnectionContext context = new ConnectionContext();
897                    context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
898                    context.setBroker(brokerService.getBroker());
899
900                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
901                    advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
902                    advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
903                            advisoryMessage);
904
905                }
906            } catch (Exception e) {
907                LOG.warn("failed to fire forward failure advisory, cause: {}", e);
908                LOG.debug("detail", e);
909            }
910        }
911    }
912
913    protected Service getControllingService() {
914        return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
915    }
916
917    protected void addSubscription(DemandSubscription sub) throws IOException {
918        if (sub != null) {
919            if (isDuplex()) {
920                // async vm transport, need to wait for completion
921                localBroker.request(sub.getLocalInfo());
922            } else {
923                localBroker.oneway(sub.getLocalInfo());
924            }
925        }
926    }
927
928    protected void removeSubscription(final DemandSubscription sub) throws IOException {
929        if (sub != null) {
930            LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
931
932            // ensure not available for conduit subs pending removal
933            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
934            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
935
936            // continue removal in separate thread to free up this thread for outstanding responses
937            // Serialize with removeDestination operations so that removeSubs are serialized with
938            // removeDestinations such that all removeSub advisories are generated
939            serialExecutor.execute(new Runnable() {
940                @Override
941                public void run() {
942                    sub.waitForCompletion();
943                    try {
944                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
945                    } catch (IOException e) {
946                        LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
947                    }
948                }
949            });
950        }
951    }
952
953    protected Message configureMessage(MessageDispatch md) throws IOException {
954        Message message = md.getMessage().copy();
955        // Update the packet to show where it came from.
956        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
957        message.setProducerId(producerInfo.getProducerId());
958        message.setDestination(md.getDestination());
959        message.setMemoryUsage(null);
960        if (message.getOriginalTransactionId() == null) {
961            message.setOriginalTransactionId(message.getTransactionId());
962        }
963        message.setTransactionId(null);
964        if (configuration.isUseCompression()) {
965            message.compress();
966        }
967        return message;
968    }
969
970    protected void serviceLocalCommand(Command command) {
971        if (!disposed.get()) {
972            try {
973                if (command.isMessageDispatch()) {
974                    safeWaitUntilStarted();
975                    networkBridgeStatistics.getEnqueues().increment();
976                    final MessageDispatch md = (MessageDispatch) command;
977                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
978                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
979
980                        if (suppressMessageDispatch(md, sub)) {
981                            LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
982                                    configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
983                            });
984                            // still ack as it may be durable
985                            try {
986                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
987                            } finally {
988                                sub.decrementOutstandingResponses();
989                            }
990                            return;
991                        }
992
993                        Message message = configureMessage(md);
994                        LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
995                                configuration.getBrokerName(), remoteBrokerName, (LOG.isTraceEnabled() ? message : message.getMessageId()), md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), message
996                        });
997
998                        if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
999                            try {
1000                                // never request b/c they are eventually acked async
1001                                remoteBroker.oneway(message);
1002                            } finally {
1003                                sub.decrementOutstandingResponses();
1004                            }
1005                            return;
1006                        }
1007
1008                        if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
1009
1010                            // The message was not sent using async send, so we should only
1011                            // ack the local broker when we get confirmation that the remote
1012                            // broker has received the message.
1013                            remoteBroker.asyncRequest(message, new ResponseCallback() {
1014                                @Override
1015                                public void onCompletion(FutureResponse future) {
1016                                    try {
1017                                        Response response = future.getResult();
1018                                        if (response.isException()) {
1019                                            ExceptionResponse er = (ExceptionResponse) response;
1020                                            serviceLocalException(md, er.getException());
1021                                        } else {
1022                                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1023                                            networkBridgeStatistics.getDequeues().increment();
1024                                        }
1025                                    } catch (IOException e) {
1026                                        serviceLocalException(md, e);
1027                                    } finally {
1028                                        sub.decrementOutstandingResponses();
1029                                    }
1030                                }
1031                            });
1032
1033                        } else {
1034                            // If the message was originally sent using async send, we will
1035                            // preserve that QOS by bridging it using an async send (small chance
1036                            // of message loss).
1037                            try {
1038                                remoteBroker.oneway(message);
1039                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1040                                networkBridgeStatistics.getDequeues().increment();
1041                            } finally {
1042                                sub.decrementOutstandingResponses();
1043                            }
1044                        }
1045                        serviceOutbound(message);
1046                    } else {
1047                        LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
1048                    }
1049                } else if (command.isBrokerInfo()) {
1050                    futureLocalBrokerInfo.set((BrokerInfo) command);
1051                } else if (command.isShutdownInfo()) {
1052                    LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
1053                    stop();
1054                } else if (command.getClass() == ConnectionError.class) {
1055                    ConnectionError ce = (ConnectionError) command;
1056                    serviceLocalException(ce.getException());
1057                } else {
1058                    switch (command.getDataStructureType()) {
1059                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
1060                            break;
1061                        default:
1062                            LOG.warn("Unexpected local command: {}", command);
1063                    }
1064                }
1065            } catch (Throwable e) {
1066                LOG.warn("Caught an exception processing local command", e);
1067                serviceLocalException(e);
1068            }
1069        }
1070    }
1071
1072    private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
1073        boolean suppress = false;
1074        // for durable subs, suppression via filter leaves dangling acks so we
1075        // need to check here and allow the ack irrespective
1076        if (sub.getLocalInfo().isDurable()) {
1077            MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
1078            messageEvalContext.setMessageReference(md.getMessage());
1079            messageEvalContext.setDestination(md.getDestination());
1080            suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
1081        }
1082        return suppress;
1083    }
1084
1085    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
1086        if (brokerPath != null) {
1087            for (BrokerId id : brokerPath) {
1088                if (brokerId.equals(id)) {
1089                    return true;
1090                }
1091            }
1092        }
1093        return false;
1094    }
1095
1096    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
1097        if (brokerPath == null || brokerPath.length == 0) {
1098            return pathsToAppend;
1099        }
1100        BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
1101        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1102        System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
1103        return rc;
1104    }
1105
1106    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
1107        if (brokerPath == null || brokerPath.length == 0) {
1108            return new BrokerId[]{idToAppend};
1109        }
1110        BrokerId rc[] = new BrokerId[brokerPath.length + 1];
1111        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1112        rc[brokerPath.length] = idToAppend;
1113        return rc;
1114    }
1115
1116    protected boolean isPermissableDestination(ActiveMQDestination destination) {
1117        return isPermissableDestination(destination, false);
1118    }
1119
1120    protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
1121        // Are we not bridging temporary destinations?
1122        if (destination.isTemporary()) {
1123            if (allowTemporary) {
1124                return true;
1125            } else {
1126                return configuration.isBridgeTempDestinations();
1127            }
1128        }
1129
1130        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1131        if (dests != null && dests.length > 0) {
1132            for (ActiveMQDestination dest : dests) {
1133                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1134                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1135                    return true;
1136                }
1137            }
1138        }
1139
1140        dests = excludedDestinations;
1141        if (dests != null && dests.length > 0) {
1142            for (ActiveMQDestination dest : dests) {
1143                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
1144                if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1145                    return false;
1146                }
1147            }
1148        }
1149
1150        dests = dynamicallyIncludedDestinations;
1151        if (dests != null && dests.length > 0) {
1152            for (ActiveMQDestination dest : dests) {
1153                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1154                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1155                    return true;
1156                }
1157            }
1158
1159            return false;
1160        }
1161        return true;
1162    }
1163
1164    /**
1165     * Subscriptions for these destinations are always created
1166     */
1167    protected void setupStaticDestinations() {
1168        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1169        if (dests != null) {
1170            for (ActiveMQDestination dest : dests) {
1171                DemandSubscription sub = createDemandSubscription(dest);
1172                sub.setStaticallyIncluded(true);
1173                try {
1174                    addSubscription(sub);
1175                } catch (IOException e) {
1176                    LOG.error("Failed to add static destination {}", dest, e);
1177                }
1178                LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
1179            }
1180        }
1181    }
1182
1183    protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1184        ConsumerInfo info = consumerInfo.copy();
1185        addRemoteBrokerToBrokerPath(info);
1186        DemandSubscription sub = createDemandSubscription(info);
1187        if (sub != null) {
1188            if (duplicateSuppressionIsRequired(sub)) {
1189                undoMapRegistration(sub);
1190            } else {
1191                if (consumerInfo.isDurable()) {
1192                    sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
1193                }
1194                addSubscription(sub);
1195                LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub);
1196            }
1197        }
1198    }
1199
1200    private void undoMapRegistration(DemandSubscription sub) {
1201        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1202        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1203    }
1204
1205    /*
1206     * check our existing subs networkConsumerIds against the list of network
1207     * ids in this subscription A match means a duplicate which we suppress for
1208     * topics and maybe for queues
1209     */
1210    private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1211        final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1212        boolean suppress = false;
1213
1214        if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic()
1215                && !configuration.isSuppressDuplicateTopicSubscriptions()) {
1216            return suppress;
1217        }
1218
1219        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1220        Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
1221        for (Subscription sub : currentSubs) {
1222            List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1223            if (!networkConsumers.isEmpty()) {
1224                if (matchFound(candidateConsumers, networkConsumers)) {
1225                    if (isInActiveDurableSub(sub)) {
1226                        suppress = false;
1227                    } else {
1228                        suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1229                    }
1230                    break;
1231                }
1232            }
1233        }
1234        return suppress;
1235    }
1236
1237    private boolean isInActiveDurableSub(Subscription sub) {
1238        return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
1239    }
1240
1241    private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1242        boolean suppress = false;
1243
1244        if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1245            LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{
1246                    configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds()
1247            });
1248            suppress = true;
1249        } else {
1250            // remove the existing lower priority duplicate and allow this candidate
1251            try {
1252                removeDuplicateSubscription(existingSub);
1253
1254                LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{
1255                        configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds()
1256                });
1257            } catch (IOException e) {
1258                LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e);
1259            }
1260        }
1261        return suppress;
1262    }
1263
1264    private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1265        for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1266            if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1267                break;
1268            }
1269        }
1270    }
1271
1272    private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1273        boolean found = false;
1274        for (ConsumerId aliasConsumer : networkConsumers) {
1275            if (candidateConsumers.contains(aliasConsumer)) {
1276                found = true;
1277                break;
1278            }
1279        }
1280        return found;
1281    }
1282
1283    protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1284        RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1285        Region region;
1286        Collection<Subscription> subs;
1287
1288        region = null;
1289        switch (dest.getDestinationType()) {
1290            case ActiveMQDestination.QUEUE_TYPE:
1291                region = region_broker.getQueueRegion();
1292                break;
1293            case ActiveMQDestination.TOPIC_TYPE:
1294                region = region_broker.getTopicRegion();
1295                break;
1296            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1297                region = region_broker.getTempQueueRegion();
1298                break;
1299            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1300                region = region_broker.getTempTopicRegion();
1301                break;
1302        }
1303
1304        if (region instanceof AbstractRegion) {
1305            subs = ((AbstractRegion) region).getSubscriptions().values();
1306        } else {
1307            subs = null;
1308        }
1309
1310        return subs;
1311    }
1312
1313    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1314        // add our original id to ourselves
1315        info.addNetworkConsumerId(info.getConsumerId());
1316        return doCreateDemandSubscription(info);
1317    }
1318
1319    protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1320        DemandSubscription result = new DemandSubscription(info);
1321        result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1322        if (info.getDestination().isTemporary()) {
1323            // reset the local connection Id
1324            ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1325            dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1326        }
1327
1328        if (configuration.isDecreaseNetworkConsumerPriority()) {
1329            byte priority = (byte) configuration.getConsumerPriorityBase();
1330            if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1331                // The longer the path to the consumer, the less it's consumer priority.
1332                priority -= info.getBrokerPath().length + 1;
1333            }
1334            result.getLocalInfo().setPriority(priority);
1335            LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
1336        }
1337        configureDemandSubscription(info, result);
1338        return result;
1339    }
1340
1341    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1342        ConsumerInfo info = new ConsumerInfo();
1343        info.setNetworkSubscription(true);
1344        info.setDestination(destination);
1345
1346        // Indicate that this subscription is being made on behalf of the remote broker.
1347        info.setBrokerPath(new BrokerId[]{remoteBrokerId});
1348
1349        // the remote info held by the DemandSubscription holds the original
1350        // consumerId, the local info get's overwritten
1351        info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1352        DemandSubscription result = null;
1353        try {
1354            result = createDemandSubscription(info);
1355        } catch (IOException e) {
1356            LOG.error("Failed to create DemandSubscription ", e);
1357        }
1358        return result;
1359    }
1360
1361    protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1362        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ||
1363                AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
1364            sub.getLocalInfo().setDispatchAsync(true);
1365        } else {
1366            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1367        }
1368        configureConsumerPrefetch(sub.getLocalInfo());
1369        subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1370        subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1371
1372        sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1373        if (!info.isDurable()) {
1374            // This works for now since we use a VM connection to the local broker.
1375            // may need to change if we ever subscribe to a remote broker.
1376            sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1377        } else {
1378            sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
1379        }
1380    }
1381
1382    protected void removeDemandSubscription(ConsumerId id) throws IOException {
1383        DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1384        LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{
1385                configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub
1386        });
1387        if (sub != null) {
1388            removeSubscription(sub);
1389            LOG.debug("{} removed sub on {} from {}: {}", new Object[]{
1390                    configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo()
1391            });
1392        }
1393    }
1394
1395    protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1396        boolean removeDone = false;
1397        DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1398        if (sub != null) {
1399            try {
1400                removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1401                removeDone = true;
1402            } catch (IOException e) {
1403                LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e);
1404            }
1405        }
1406        return removeDone;
1407    }
1408
1409    /**
1410     * Performs a timed wait on the started latch and then checks for disposed
1411     * before performing another wait each time the the started wait times out.
1412     */
1413    protected boolean safeWaitUntilStarted() throws InterruptedException {
1414        while (!disposed.get()) {
1415            if (startedLatch.await(1, TimeUnit.SECONDS)) {
1416                break;
1417            }
1418        }
1419        return !disposed.get();
1420    }
1421
1422    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1423        NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1424        if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1425            PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1426            if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1427                filterFactory = entry.getNetworkBridgeFilterFactory();
1428            }
1429        }
1430        return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL());
1431    }
1432
1433    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1434        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1435    }
1436
1437    protected BrokerId[] getRemoteBrokerPath() {
1438        return remoteBrokerPath;
1439    }
1440
1441    @Override
1442    public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1443        this.networkBridgeListener = listener;
1444    }
1445
1446    private void fireBridgeFailed(Throwable reason) {
1447        LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason);
1448        NetworkBridgeListener l = this.networkBridgeListener;
1449        if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
1450            l.bridgeFailed();
1451        }
1452    }
1453
1454    /**
1455     * @return Returns the dynamicallyIncludedDestinations.
1456     */
1457    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
1458        return dynamicallyIncludedDestinations;
1459    }
1460
1461    /**
1462     * @param dynamicallyIncludedDestinations
1463     *         The dynamicallyIncludedDestinations to set.
1464     */
1465    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
1466        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
1467    }
1468
1469    /**
1470     * @return Returns the excludedDestinations.
1471     */
1472    public ActiveMQDestination[] getExcludedDestinations() {
1473        return excludedDestinations;
1474    }
1475
1476    /**
1477     * @param excludedDestinations The excludedDestinations to set.
1478     */
1479    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
1480        this.excludedDestinations = excludedDestinations;
1481    }
1482
1483    /**
1484     * @return Returns the staticallyIncludedDestinations.
1485     */
1486    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
1487        return staticallyIncludedDestinations;
1488    }
1489
1490    /**
1491     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
1492     */
1493    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
1494        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
1495    }
1496
1497    /**
1498     * @return Returns the durableDestinations.
1499     */
1500    public ActiveMQDestination[] getDurableDestinations() {
1501        return durableDestinations;
1502    }
1503
1504    /**
1505     * @param durableDestinations The durableDestinations to set.
1506     */
1507    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
1508        this.durableDestinations = durableDestinations;
1509    }
1510
1511    /**
1512     * @return Returns the localBroker.
1513     */
1514    public Transport getLocalBroker() {
1515        return localBroker;
1516    }
1517
1518    /**
1519     * @return Returns the remoteBroker.
1520     */
1521    public Transport getRemoteBroker() {
1522        return remoteBroker;
1523    }
1524
1525    /**
1526     * @return the createdByDuplex
1527     */
1528    public boolean isCreatedByDuplex() {
1529        return this.createdByDuplex;
1530    }
1531
1532    /**
1533     * @param createdByDuplex the createdByDuplex to set
1534     */
1535    public void setCreatedByDuplex(boolean createdByDuplex) {
1536        this.createdByDuplex = createdByDuplex;
1537    }
1538
1539    @Override
1540    public String getRemoteAddress() {
1541        return remoteBroker.getRemoteAddress();
1542    }
1543
1544    @Override
1545    public String getLocalAddress() {
1546        return localBroker.getRemoteAddress();
1547    }
1548
1549    @Override
1550    public String getRemoteBrokerName() {
1551        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1552    }
1553
1554    @Override
1555    public String getRemoteBrokerId() {
1556        return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
1557    }
1558
1559    @Override
1560    public String getLocalBrokerName() {
1561        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1562    }
1563
1564    @Override
1565    public long getDequeueCounter() {
1566        return networkBridgeStatistics.getDequeues().getCount();
1567    }
1568
1569    @Override
1570    public long getEnqueueCounter() {
1571        return networkBridgeStatistics.getEnqueues().getCount();
1572    }
1573
1574    @Override
1575    public NetworkBridgeStatistics getNetworkBridgeStatistics() {
1576        return networkBridgeStatistics;
1577    }
1578
1579    protected boolean isDuplex() {
1580        return configuration.isDuplex() || createdByDuplex;
1581    }
1582
1583    public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1584        return subscriptionMapByRemoteId;
1585    }
1586
1587    @Override
1588    public void setBrokerService(BrokerService brokerService) {
1589        this.brokerService = brokerService;
1590        this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1591        localBrokerPath[0] = localBrokerId;
1592    }
1593
1594    @Override
1595    public void setMbeanObjectName(ObjectName objectName) {
1596        this.mbeanObjectName = objectName;
1597    }
1598
1599    @Override
1600    public ObjectName getMbeanObjectName() {
1601        return mbeanObjectName;
1602    }
1603
1604    @Override
1605    public void resetStats() {
1606        networkBridgeStatistics.reset();
1607    }
1608
1609    /*
1610     * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
1611     * remote sides of the network bridge.
1612     */
1613    private static class FutureBrokerInfo implements Future<BrokerInfo> {
1614
1615        private final CountDownLatch slot = new CountDownLatch(1);
1616        private final AtomicBoolean disposed;
1617        private volatile BrokerInfo info = null;
1618
1619        public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
1620            this.info = info;
1621            this.disposed = disposed;
1622        }
1623
1624        @Override
1625        public boolean cancel(boolean mayInterruptIfRunning) {
1626            slot.countDown();
1627            return true;
1628        }
1629
1630        @Override
1631        public boolean isCancelled() {
1632            return slot.getCount() == 0 && info == null;
1633        }
1634
1635        @Override
1636        public boolean isDone() {
1637            return info != null;
1638        }
1639
1640        @Override
1641        public BrokerInfo get() throws InterruptedException, ExecutionException {
1642            try {
1643                if (info == null) {
1644                    while (!disposed.get()) {
1645                        if (slot.await(1, TimeUnit.SECONDS)) {
1646                            break;
1647                        }
1648                    }
1649                }
1650                return info;
1651            } catch (InterruptedException e) {
1652                Thread.currentThread().interrupt();
1653                LOG.debug("Operation interrupted: {}", e, e);
1654                throw new InterruptedException("Interrupted.");
1655            }
1656        }
1657
1658        @Override
1659        public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1660            try {
1661                if (info == null) {
1662                    long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
1663
1664                    while (!disposed.get() || System.currentTimeMillis() < deadline) {
1665                        if (slot.await(1, TimeUnit.MILLISECONDS)) {
1666                            break;
1667                        }
1668                    }
1669                    if (info == null) {
1670                        throw new TimeoutException();
1671                    }
1672                }
1673                return info;
1674            } catch (InterruptedException e) {
1675                throw new InterruptedException("Interrupted.");
1676            }
1677        }
1678
1679        public void set(BrokerInfo info) {
1680            this.info = info;
1681            this.slot.countDown();
1682        }
1683    }
1684
1685    protected void serviceOutbound(Message message) {
1686        NetworkBridgeListener l = this.networkBridgeListener;
1687        if (l != null) {
1688            l.onOutboundMessage(this, message);
1689        }
1690    }
1691
1692    protected void serviceInboundMessage(Message message) {
1693        NetworkBridgeListener l = this.networkBridgeListener;
1694        if (l != null) {
1695            l.onInboundMessage(this, message);
1696        }
1697    }
1698
1699    protected boolean canDuplexDispatch(Message message) {
1700        boolean result = true;
1701        if (configuration.isCheckDuplicateMessagesOnDuplex()){
1702            final long producerSequenceId = message.getMessageId().getProducerSequenceId();
1703            //  messages are multiplexed on this producer so we need to query the persistenceAdapter
1704            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
1705            if (producerSequenceId <= lastStoredForMessageProducer) {
1706                result = false;
1707                LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
1708                        (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
1709                });
1710            }
1711        }
1712        return result;
1713    }
1714
1715    protected long getStoredSequenceIdForMessage(MessageId messageId) {
1716        try {
1717            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
1718        } catch (IOException ignored) {
1719            LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
1720        }
1721        return -1;
1722    }
1723
1724    protected void configureConsumerPrefetch(ConsumerInfo consumerInfo) {
1725        //If a consumer on an advisory topic and advisoryPrefetchSize has been explicitly
1726        //set then use it, else default to the prefetchSize setting
1727        if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination()) &&
1728                configuration.getAdvisoryPrefetchSize() > 0) {
1729            consumerInfo.setPrefetchSize(configuration.getAdvisoryPrefetchSize());
1730        } else {
1731            consumerInfo.setPrefetchSize(configuration.getPrefetchSize());
1732        }
1733    }
1734
1735}