001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.io.IOException;
020import java.security.GeneralSecurityException;
021import java.security.cert.X509Certificate;
022import java.util.Arrays;
023import java.util.Collection;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Properties;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.Executors;
033import java.util.concurrent.Future;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036import java.util.concurrent.atomic.AtomicBoolean;
037
038import javax.management.ObjectName;
039
040import org.apache.activemq.DestinationDoesNotExistException;
041import org.apache.activemq.Service;
042import org.apache.activemq.advisory.AdvisoryBroker;
043import org.apache.activemq.advisory.AdvisorySupport;
044import org.apache.activemq.broker.BrokerService;
045import org.apache.activemq.broker.BrokerServiceAware;
046import org.apache.activemq.broker.ConnectionContext;
047import org.apache.activemq.broker.TransportConnection;
048import org.apache.activemq.broker.region.AbstractRegion;
049import org.apache.activemq.broker.region.DurableTopicSubscription;
050import org.apache.activemq.broker.region.Region;
051import org.apache.activemq.broker.region.RegionBroker;
052import org.apache.activemq.broker.region.Subscription;
053import org.apache.activemq.broker.region.policy.PolicyEntry;
054import org.apache.activemq.command.ActiveMQDestination;
055import org.apache.activemq.command.ActiveMQMessage;
056import org.apache.activemq.command.ActiveMQTempDestination;
057import org.apache.activemq.command.ActiveMQTopic;
058import org.apache.activemq.command.BrokerId;
059import org.apache.activemq.command.BrokerInfo;
060import org.apache.activemq.command.Command;
061import org.apache.activemq.command.ConnectionError;
062import org.apache.activemq.command.ConnectionId;
063import org.apache.activemq.command.ConnectionInfo;
064import org.apache.activemq.command.ConsumerId;
065import org.apache.activemq.command.ConsumerInfo;
066import org.apache.activemq.command.DataStructure;
067import org.apache.activemq.command.DestinationInfo;
068import org.apache.activemq.command.ExceptionResponse;
069import org.apache.activemq.command.KeepAliveInfo;
070import org.apache.activemq.command.Message;
071import org.apache.activemq.command.MessageAck;
072import org.apache.activemq.command.MessageDispatch;
073import org.apache.activemq.command.MessageId;
074import org.apache.activemq.command.NetworkBridgeFilter;
075import org.apache.activemq.command.ProducerInfo;
076import org.apache.activemq.command.RemoveInfo;
077import org.apache.activemq.command.RemoveSubscriptionInfo;
078import org.apache.activemq.command.Response;
079import org.apache.activemq.command.SessionInfo;
080import org.apache.activemq.command.ShutdownInfo;
081import org.apache.activemq.command.SubscriptionInfo;
082import org.apache.activemq.command.WireFormatInfo;
083import org.apache.activemq.filter.DestinationFilter;
084import org.apache.activemq.filter.MessageEvaluationContext;
085import org.apache.activemq.security.SecurityContext;
086import org.apache.activemq.transport.DefaultTransportListener;
087import org.apache.activemq.transport.FutureResponse;
088import org.apache.activemq.transport.ResponseCallback;
089import org.apache.activemq.transport.Transport;
090import org.apache.activemq.transport.TransportDisposedIOException;
091import org.apache.activemq.transport.TransportFilter;
092import org.apache.activemq.transport.tcp.SslTransport;
093import org.apache.activemq.util.IdGenerator;
094import org.apache.activemq.util.IntrospectionSupport;
095import org.apache.activemq.util.LongSequenceGenerator;
096import org.apache.activemq.util.MarshallingSupport;
097import org.apache.activemq.util.ServiceStopper;
098import org.apache.activemq.util.ServiceSupport;
099import org.slf4j.Logger;
100import org.slf4j.LoggerFactory;
101
102/**
103 * A useful base class for implementing demand forwarding bridges.
104 */
105public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
106    private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
107    protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
108    protected final Transport localBroker;
109    protected final Transport remoteBroker;
110    protected IdGenerator idGenerator = new IdGenerator();
111    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
112    protected ConnectionInfo localConnectionInfo;
113    protected ConnectionInfo remoteConnectionInfo;
114    protected SessionInfo localSessionInfo;
115    protected ProducerInfo producerInfo;
116    protected String remoteBrokerName = "Unknown";
117    protected String localClientId;
118    protected ConsumerInfo demandConsumerInfo;
119    protected int demandConsumerDispatched;
120    protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
121    protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
122    protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
123    protected final AtomicBoolean disposed = new AtomicBoolean();
124    protected BrokerId localBrokerId;
125    protected ActiveMQDestination[] excludedDestinations;
126    protected ActiveMQDestination[] dynamicallyIncludedDestinations;
127    protected ActiveMQDestination[] staticallyIncludedDestinations;
128    protected ActiveMQDestination[] durableDestinations;
129    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
130    protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
131    protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
132    protected final CountDownLatch startedLatch = new CountDownLatch(2);
133    protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
134    protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
135    protected NetworkBridgeConfiguration configuration;
136    protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
137
138    protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
139    protected BrokerId remoteBrokerId;
140
141    protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics();
142
143    private NetworkBridgeListener networkBridgeListener;
144    private boolean createdByDuplex;
145    private BrokerInfo localBrokerInfo;
146    private BrokerInfo remoteBrokerInfo;
147
148    private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
149    private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
150
151    private final AtomicBoolean started = new AtomicBoolean();
152    private TransportConnection duplexInitiatingConnection;
153    private final AtomicBoolean duplexInitiatingConnectionInfoReceived = new AtomicBoolean();
154    protected BrokerService brokerService = null;
155    private ObjectName mbeanObjectName;
156    private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
157    private Transport duplexInboundLocalBroker = null;
158    private ProducerInfo duplexInboundLocalProducerInfo;
159
160    public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
161        this.configuration = configuration;
162        this.localBroker = localBroker;
163        this.remoteBroker = remoteBroker;
164    }
165
166    public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
167        this.localBrokerInfo = localBrokerInfo;
168        this.remoteBrokerInfo = remoteBrokerInfo;
169        this.duplexInitiatingConnection = connection;
170        start();
171        serviceRemoteCommand(remoteBrokerInfo);
172    }
173
174    @Override
175    public void start() throws Exception {
176        if (started.compareAndSet(false, true)) {
177
178            if (brokerService == null) {
179                throw new IllegalArgumentException("BrokerService is null on " + this);
180            }
181
182            networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
183
184            if (isDuplex()) {
185                duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
186                duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
187
188                    @Override
189                    public void onCommand(Object o) {
190                        Command command = (Command) o;
191                        serviceLocalCommand(command);
192                    }
193
194                    @Override
195                    public void onException(IOException error) {
196                        serviceLocalException(error);
197                    }
198                });
199                duplexInboundLocalBroker.start();
200            }
201
202            localBroker.setTransportListener(new DefaultTransportListener() {
203
204                @Override
205                public void onCommand(Object o) {
206                    Command command = (Command) o;
207                    serviceLocalCommand(command);
208                }
209
210                @Override
211                public void onException(IOException error) {
212                    if (!futureLocalBrokerInfo.isDone()) {
213                        futureLocalBrokerInfo.cancel(true);
214                        return;
215                    }
216                    serviceLocalException(error);
217                }
218            });
219
220            remoteBroker.setTransportListener(new DefaultTransportListener() {
221
222                @Override
223                public void onCommand(Object o) {
224                    Command command = (Command) o;
225                    serviceRemoteCommand(command);
226                }
227
228                @Override
229                public void onException(IOException error) {
230                    if (!futureRemoteBrokerInfo.isDone()) {
231                        futureRemoteBrokerInfo.cancel(true);
232                        return;
233                    }
234                    serviceRemoteException(error);
235                }
236            });
237
238            remoteBroker.start();
239            localBroker.start();
240
241            if (!disposed.get()) {
242                try {
243                    triggerStartAsyncNetworkBridgeCreation();
244                } catch (IOException e) {
245                    LOG.warn("Caught exception from remote start", e);
246                }
247            } else {
248                LOG.warn("Bridge was disposed before the start() method was fully executed.");
249                throw new TransportDisposedIOException();
250            }
251        }
252    }
253
254    @Override
255    public void stop() throws Exception {
256        if (started.compareAndSet(true, false)) {
257            if (disposed.compareAndSet(false, true)) {
258                LOG.debug(" stopping {} bridge to {}", configuration.getBrokerName(), remoteBrokerName);
259
260                futureRemoteBrokerInfo.cancel(true);
261                futureLocalBrokerInfo.cancel(true);
262
263                NetworkBridgeListener l = this.networkBridgeListener;
264                if (l != null) {
265                    l.onStop(this);
266                }
267                try {
268                    // local start complete
269                    if (startedLatch.getCount() < 2) {
270                        LOG.trace("{} unregister bridge ({}) to {}", new Object[]{
271                                configuration.getBrokerName(), this, remoteBrokerName
272                        });
273                        brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
274                        brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
275                    }
276
277                    remoteBridgeStarted.set(false);
278                    final CountDownLatch sendShutdown = new CountDownLatch(1);
279
280                    brokerService.getTaskRunnerFactory().execute(new Runnable() {
281                        @Override
282                        public void run() {
283                            try {
284                                serialExecutor.shutdown();
285                                if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
286                                    List<Runnable> pendingTasks = serialExecutor.shutdownNow();
287                                    LOG.info("pending tasks on stop {}", pendingTasks);
288                                }
289                                localBroker.oneway(new ShutdownInfo());
290                                remoteBroker.oneway(new ShutdownInfo());
291                            } catch (Throwable e) {
292                                LOG.debug("Caught exception sending shutdown", e);
293                            } finally {
294                                sendShutdown.countDown();
295                            }
296
297                        }
298                    }, "ActiveMQ ForwardingBridge StopTask");
299
300                    if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
301                        LOG.info("Network Could not shutdown in a timely manner");
302                    }
303                } finally {
304                    ServiceStopper ss = new ServiceStopper();
305                    ss.stop(remoteBroker);
306                    ss.stop(localBroker);
307                    ss.stop(duplexInboundLocalBroker);
308                    // Release the started Latch since another thread could be
309                    // stuck waiting for it to start up.
310                    startedLatch.countDown();
311                    startedLatch.countDown();
312                    localStartedLatch.countDown();
313
314                    ss.throwFirstException();
315                }
316            }
317
318            LOG.info("{} bridge to {} stopped", configuration.getBrokerName(), remoteBrokerName);
319        }
320    }
321
322    protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
323        brokerService.getTaskRunnerFactory().execute(new Runnable() {
324            @Override
325            public void run() {
326                final String originalName = Thread.currentThread().getName();
327                Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
328                        "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
329
330                try {
331                    // First we collect the info data from both the local and remote ends
332                    collectBrokerInfos();
333
334                    // Once we have all required broker info we can attempt to start
335                    // the local and then remote sides of the bridge.
336                    doStartLocalAndRemoteBridges();
337                } finally {
338                    Thread.currentThread().setName(originalName);
339                }
340            }
341        });
342    }
343
344    private void collectBrokerInfos() {
345
346        // First wait for the remote to feed us its BrokerInfo, then we can check on
347        // the LocalBrokerInfo and decide is this is a loop.
348        try {
349            remoteBrokerInfo = futureRemoteBrokerInfo.get();
350            if (remoteBrokerInfo == null) {
351                serviceLocalException(new Throwable("remoteBrokerInfo is null"));
352                return;
353            }
354        } catch (Exception e) {
355            serviceRemoteException(e);
356            return;
357        }
358
359        try {
360            localBrokerInfo = futureLocalBrokerInfo.get();
361            if (localBrokerInfo == null) {
362                serviceLocalException(new Throwable("localBrokerInfo is null"));
363                return;
364            }
365
366            // Before we try and build the bridge lets check if we are in a loop
367            // and if so just stop now before registering anything.
368            remoteBrokerId = remoteBrokerInfo.getBrokerId();
369            if (localBrokerId.equals(remoteBrokerId)) {
370                LOG.trace("{} disconnecting remote loop back connector for: {}, with id: {}", new Object[]{
371                        configuration.getBrokerName(), remoteBrokerName, remoteBrokerId
372                });
373                ServiceSupport.dispose(localBroker);
374                ServiceSupport.dispose(remoteBroker);
375                // the bridge is left in a bit of limbo, but it won't get retried
376                // in this state.
377                return;
378            }
379
380            // Fill in the remote broker's information now.
381            remoteBrokerPath[0] = remoteBrokerId;
382            remoteBrokerName = remoteBrokerInfo.getBrokerName();
383            if (configuration.isUseBrokerNamesAsIdSeed()) {
384                idGenerator = new IdGenerator(brokerService.getBrokerName() + "->" + remoteBrokerName);
385            }
386        } catch (Throwable e) {
387            serviceLocalException(e);
388        }
389    }
390
391    private void doStartLocalAndRemoteBridges() {
392
393        if (disposed.get()) {
394            return;
395        }
396
397        if (isCreatedByDuplex()) {
398            // apply remote (propagated) configuration to local duplex bridge before start
399            Properties props = null;
400            try {
401                props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
402                IntrospectionSupport.getProperties(configuration, props, null);
403                if (configuration.getExcludedDestinations() != null) {
404                    excludedDestinations = configuration.getExcludedDestinations().toArray(
405                            new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
406                }
407                if (configuration.getStaticallyIncludedDestinations() != null) {
408                    staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
409                            new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
410                }
411                if (configuration.getDynamicallyIncludedDestinations() != null) {
412                    dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
413                            new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
414                }
415            } catch (Throwable t) {
416                LOG.error("Error mapping remote configuration: {}", props, t);
417            }
418        }
419
420        try {
421            startLocalBridge();
422        } catch (Throwable e) {
423            serviceLocalException(e);
424            return;
425        }
426
427        try {
428            startRemoteBridge();
429        } catch (Throwable e) {
430            serviceRemoteException(e);
431            return;
432        }
433
434        try {
435            if (safeWaitUntilStarted()) {
436                setupStaticDestinations();
437            }
438        } catch (Throwable e) {
439            serviceLocalException(e);
440        }
441    }
442
443    private void startLocalBridge() throws Throwable {
444        if (!bridgeFailed.get() && localBridgeStarted.compareAndSet(false, true)) {
445            synchronized (this) {
446                LOG.trace("{} starting local Bridge, localBroker={}", configuration.getBrokerName(), localBroker);
447                if (!disposed.get()) {
448
449                    if (idGenerator == null) {
450                        throw new IllegalStateException("Id Generator cannot be null");
451                    }
452
453                    localConnectionInfo = new ConnectionInfo();
454                    localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
455                    localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
456                    localConnectionInfo.setClientId(localClientId);
457                    localConnectionInfo.setUserName(configuration.getUserName());
458                    localConnectionInfo.setPassword(configuration.getPassword());
459                    Transport originalTransport = remoteBroker;
460                    while (originalTransport instanceof TransportFilter) {
461                        originalTransport = ((TransportFilter) originalTransport).getNext();
462                    }
463                    if (originalTransport instanceof SslTransport) {
464                        X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
465                        localConnectionInfo.setTransportContext(peerCerts);
466                    }
467                    // sync requests that may fail
468                    Object resp = localBroker.request(localConnectionInfo);
469                    if (resp instanceof ExceptionResponse) {
470                        throw ((ExceptionResponse) resp).getException();
471                    }
472                    localSessionInfo = new SessionInfo(localConnectionInfo, 1);
473                    localBroker.oneway(localSessionInfo);
474
475                    if (configuration.isDuplex()) {
476                        // separate in-bound channel for forwards so we don't
477                        // contend with out-bound dispatch on same connection
478                        remoteBrokerInfo.setNetworkConnection(true);
479                        duplexInboundLocalBroker.oneway(remoteBrokerInfo);
480
481                        ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
482                        duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
483                        duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
484                                + configuration.getBrokerName());
485                        duplexLocalConnectionInfo.setUserName(configuration.getUserName());
486                        duplexLocalConnectionInfo.setPassword(configuration.getPassword());
487
488                        if (originalTransport instanceof SslTransport) {
489                            X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
490                            duplexLocalConnectionInfo.setTransportContext(peerCerts);
491                        }
492                        // sync requests that may fail
493                        resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
494                        if (resp instanceof ExceptionResponse) {
495                            throw ((ExceptionResponse) resp).getException();
496                        }
497                        SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
498                        duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
499                        duplexInboundLocalBroker.oneway(duplexInboundSession);
500                        duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
501                    }
502                    brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
503                    NetworkBridgeListener l = this.networkBridgeListener;
504                    if (l != null) {
505                        l.onStart(this);
506                    }
507
508                    // Let the local broker know the remote broker's ID.
509                    localBroker.oneway(remoteBrokerInfo);
510                    // new peer broker (a consumer can work with remote broker also)
511                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
512
513                    LOG.info("Network connection between {} and {} ({}) has been established.", new Object[]{
514                            localBroker, remoteBroker, remoteBrokerName
515                    });
516                    LOG.trace("{} register bridge ({}) to {}", new Object[]{
517                            configuration.getBrokerName(), this, remoteBrokerName
518                    });
519                } else {
520                    LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
521                }
522                startedLatch.countDown();
523                localStartedLatch.countDown();
524            }
525        }
526    }
527
528    protected void startRemoteBridge() throws Exception {
529        if (!bridgeFailed.get() && remoteBridgeStarted.compareAndSet(false, true)) {
530            LOG.trace("{} starting remote Bridge, remoteBroker={}", configuration.getBrokerName(), remoteBroker);
531            synchronized (this) {
532                if (!isCreatedByDuplex()) {
533                    BrokerInfo brokerInfo = new BrokerInfo();
534                    brokerInfo.setBrokerName(configuration.getBrokerName());
535                    brokerInfo.setBrokerURL(configuration.getBrokerURL());
536                    brokerInfo.setNetworkConnection(true);
537                    brokerInfo.setDuplexConnection(configuration.isDuplex());
538                    // set our properties
539                    Properties props = new Properties();
540                    IntrospectionSupport.getProperties(configuration, props, null);
541                    props.remove("networkTTL");
542                    String str = MarshallingSupport.propertiesToString(props);
543                    brokerInfo.setNetworkProperties(str);
544                    brokerInfo.setBrokerId(this.localBrokerId);
545                    remoteBroker.oneway(brokerInfo);
546                }
547                if (remoteConnectionInfo != null) {
548                    remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
549                }
550                remoteConnectionInfo = new ConnectionInfo();
551                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
552                remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
553                remoteConnectionInfo.setUserName(configuration.getUserName());
554                remoteConnectionInfo.setPassword(configuration.getPassword());
555                remoteBroker.oneway(remoteConnectionInfo);
556
557                SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
558                remoteBroker.oneway(remoteSessionInfo);
559                producerInfo = new ProducerInfo(remoteSessionInfo, 1);
560                producerInfo.setResponseRequired(false);
561                remoteBroker.oneway(producerInfo);
562                // Listen to consumer advisory messages on the remote broker to determine demand.
563                if (!configuration.isStaticBridge()) {
564                    demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
565                    // always dispatch advisory message asynchronously so that
566                    // we never block the producer broker if we are slow
567                    demandConsumerInfo.setDispatchAsync(true);
568                    String advisoryTopic = configuration.getDestinationFilter();
569                    if (configuration.isBridgeTempDestinations()) {
570                        advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
571                    }
572                    demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
573                    demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
574                    remoteBroker.oneway(demandConsumerInfo);
575                }
576                startedLatch.countDown();
577            }
578        }
579    }
580
581    @Override
582    public void serviceRemoteException(Throwable error) {
583        if (!disposed.get()) {
584            if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
585                LOG.error("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
586                        localBroker, remoteBroker, error
587                });
588            } else {
589                LOG.warn("Network connection between {} and {} shutdown due to a remote error: {}", new Object[]{
590                        localBroker, remoteBroker, error
591                });
592            }
593            LOG.debug("The remote Exception was: {}", error, error);
594            brokerService.getTaskRunnerFactory().execute(new Runnable() {
595                @Override
596                public void run() {
597                    ServiceSupport.dispose(getControllingService());
598                }
599            });
600            fireBridgeFailed(error);
601        }
602    }
603
604    protected void serviceRemoteCommand(Command command) {
605        if (!disposed.get()) {
606            try {
607                if (command.isMessageDispatch()) {
608                    safeWaitUntilStarted();
609                    MessageDispatch md = (MessageDispatch) command;
610                    serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
611                    ackAdvisory(md.getMessage());
612                } else if (command.isBrokerInfo()) {
613                    futureRemoteBrokerInfo.set((BrokerInfo) command);
614                } else if (command.getClass() == ConnectionError.class) {
615                    ConnectionError ce = (ConnectionError) command;
616                    serviceRemoteException(ce.getException());
617                } else {
618                    if (isDuplex()) {
619                        LOG.trace("{} duplex command type: {}", configuration.getBrokerName(), command.getDataStructureType());
620                        if (command.isMessage()) {
621                            final ActiveMQMessage message = (ActiveMQMessage) command;
622                            if (NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
623                                serviceRemoteConsumerAdvisory(message.getDataStructure());
624                                ackAdvisory(message);
625                            } else {
626                                if (!isPermissableDestination(message.getDestination(), true)) {
627                                    return;
628                                }
629                                // message being forwarded - we need to
630                                // propagate the response to our local send
631                                if (canDuplexDispatch(message)) {
632                                    message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
633                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
634                                        duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
635                                            final int correlationId = message.getCommandId();
636
637                                            @Override
638                                            public void onCompletion(FutureResponse resp) {
639                                                try {
640                                                    Response reply = resp.getResult();
641                                                    reply.setCorrelationId(correlationId);
642                                                    remoteBroker.oneway(reply);
643                                                    //increment counter when messages are received in duplex mode
644                                                    networkBridgeStatistics.getReceivedCount().increment();
645                                                } catch (IOException error) {
646                                                    LOG.error("Exception: {} on duplex forward of: {}", error, message);
647                                                    serviceRemoteException(error);
648                                                }
649                                            }
650                                        });
651                                    } else {
652                                        duplexInboundLocalBroker.oneway(message);
653                                        networkBridgeStatistics.getReceivedCount().increment();
654                                    }
655                                    serviceInboundMessage(message);
656                                } else {
657                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
658                                        Response reply = new Response();
659                                        reply.setCorrelationId(message.getCommandId());
660                                        remoteBroker.oneway(reply);
661                                    }
662                                }
663                            }
664                        } else {
665                            switch (command.getDataStructureType()) {
666                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
667                                    if (duplexInitiatingConnection != null && duplexInitiatingConnectionInfoReceived.compareAndSet(false, true)) {
668                                        // end of initiating connection setup - propogate to initial connection to get mbean by clientid
669                                        duplexInitiatingConnection.processAddConnection((ConnectionInfo) command);
670                                    } else {
671                                        localBroker.oneway(command);
672                                    }
673                                    break;
674                                case SessionInfo.DATA_STRUCTURE_TYPE:
675                                    localBroker.oneway(command);
676                                    break;
677                                case ProducerInfo.DATA_STRUCTURE_TYPE:
678                                    // using duplexInboundLocalProducerInfo
679                                    break;
680                                case MessageAck.DATA_STRUCTURE_TYPE:
681                                    MessageAck ack = (MessageAck) command;
682                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
683                                    if (localSub != null) {
684                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
685                                        localBroker.oneway(ack);
686                                    } else {
687                                        LOG.warn("Matching local subscription not found for ack: {}", ack);
688                                    }
689                                    break;
690                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
691                                    localStartedLatch.await();
692                                    if (started.get()) {
693                                        addConsumerInfo((ConsumerInfo) command);
694                                    } else {
695                                        // received a subscription whilst stopping
696                                        LOG.warn("Stopping - ignoring ConsumerInfo: {}", command);
697                                    }
698                                    break;
699                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
700                                    // initiator is shutting down, controlled case
701                                    // abortive close dealt with by inactivity monitor
702                                    LOG.info("Stopping network bridge on shutdown of remote broker");
703                                    serviceRemoteException(new IOException(command.toString()));
704                                    break;
705                                default:
706                                    LOG.debug("Ignoring remote command: {}", command);
707                            }
708                        }
709                    } else {
710                        switch (command.getDataStructureType()) {
711                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
712                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
713                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
714                                break;
715                            default:
716                                LOG.warn("Unexpected remote command: {}", command);
717                        }
718                    }
719                }
720            } catch (Throwable e) {
721                LOG.debug("Exception processing remote command: {}", command, e);
722                serviceRemoteException(e);
723            }
724        }
725    }
726
727    private void ackAdvisory(Message message) throws IOException {
728        demandConsumerDispatched++;
729        if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
730            MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
731            ack.setConsumerId(demandConsumerInfo.getConsumerId());
732            remoteBroker.oneway(ack);
733            demandConsumerDispatched = 0;
734        }
735    }
736
737    private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
738        final int networkTTL = configuration.getConsumerTTL();
739        if (data.getClass() == ConsumerInfo.class) {
740            // Create a new local subscription
741            ConsumerInfo info = (ConsumerInfo) data;
742            BrokerId[] path = info.getBrokerPath();
743
744            if (info.isBrowser()) {
745                LOG.debug("{} Ignoring sub from {}, browsers explicitly suppressed", configuration.getBrokerName(), remoteBrokerName);
746                return;
747            }
748
749            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
750                LOG.debug("{} Ignoring sub from {}, restricted to {} network hops only: {}", new Object[]{
751                        configuration.getBrokerName(), remoteBrokerName, networkTTL, info
752                });
753                return;
754            }
755
756            if (contains(path, localBrokerPath[0])) {
757                // Ignore this consumer as it's a consumer we locally sent to the broker.
758                LOG.debug("{} Ignoring sub from {}, already routed through this broker once: {}", new Object[]{
759                        configuration.getBrokerName(), remoteBrokerName, info
760                });
761                return;
762            }
763
764            if (!isPermissableDestination(info.getDestination())) {
765                // ignore if not in the permitted or in the excluded list
766                LOG.debug("{} Ignoring sub from {}, destination {} is not permitted: {}", new Object[]{
767                        configuration.getBrokerName(), remoteBrokerName, info.getDestination(), info
768                });
769                return;
770            }
771
772            // in a cyclic network there can be multiple bridges per broker that can propagate
773            // a network subscription so there is a need to synchronize on a shared entity
774            synchronized (brokerService.getVmConnectorURI()) {
775                addConsumerInfo(info);
776            }
777        } else if (data.getClass() == DestinationInfo.class) {
778            // It's a destination info - we want to pass up information about temporary destinations
779            final DestinationInfo destInfo = (DestinationInfo) data;
780            BrokerId[] path = destInfo.getBrokerPath();
781            if (path != null && networkTTL > -1 && path.length >= networkTTL) {
782                LOG.debug("{} Ignoring destination {} restricted to {} network hops only", new Object[]{
783                        configuration.getBrokerName(), destInfo, networkTTL
784                });
785                return;
786            }
787            if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
788                LOG.debug("{} Ignoring destination {} already routed through this broker once", configuration.getBrokerName(), destInfo);
789                return;
790            }
791            destInfo.setConnectionId(localConnectionInfo.getConnectionId());
792            if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
793                // re-set connection id so comes from here
794                ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
795                tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
796            }
797            destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
798            LOG.trace("{} bridging {} destination on {} from {}, destination: {}", new Object[]{
799                    configuration.getBrokerName(), (destInfo.isAddOperation() ? "add" : "remove"), localBroker, remoteBrokerName, destInfo
800            });
801            if (destInfo.isRemoveOperation()) {
802                // Serialize with removeSub operations such that all removeSub advisories
803                // are generated
804                serialExecutor.execute(new Runnable() {
805                    @Override
806                    public void run() {
807                        try {
808                            localBroker.oneway(destInfo);
809                        } catch (IOException e) {
810                            LOG.warn("failed to deliver remove command for destination: {}", destInfo.getDestination(), e);
811                        }
812                    }
813                });
814            } else {
815                localBroker.oneway(destInfo);
816            }
817        } else if (data.getClass() == RemoveInfo.class) {
818            ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
819            removeDemandSubscription(id);
820        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
821            RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
822            SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
823            for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
824                DemandSubscription ds = i.next();
825                boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
826                if (removed) {
827                    if (ds.getDurableRemoteSubs().isEmpty()) {
828
829                        // deactivate subscriber
830                        RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
831                        localBroker.oneway(removeInfo);
832
833                        // remove subscriber
834                        RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
835                        sending.setClientId(localClientId);
836                        sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
837                        sending.setConnectionId(this.localConnectionInfo.getConnectionId());
838                        localBroker.oneway(sending);
839
840                        //remove subscriber from map
841                        i.remove();
842                    }
843                }
844            }
845        }
846    }
847
848    @Override
849    public void serviceLocalException(Throwable error) {
850        serviceLocalException(null, error);
851    }
852
853    public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
854        LOG.trace("serviceLocalException: disposed {} ex", disposed.get(), error);
855        if (!disposed.get()) {
856            if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
857                // not a reason to terminate the bridge - temps can disappear with
858                // pending sends as the demand sub may outlive the remote dest
859                if (messageDispatch != null) {
860                    LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
861                    try {
862                        MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
863                        poisonAck.setPoisonCause(error);
864                        localBroker.oneway(poisonAck);
865                    } catch (IOException ioe) {
866                        LOG.error("Failed to posion ack message following forward failure: ", ioe);
867                    }
868                    fireFailedForwardAdvisory(messageDispatch, error);
869                } else {
870                    LOG.warn("Ignoring exception on forwarding to non existent temp dest: ", error);
871                }
872                return;
873            }
874
875            LOG.info("Network connection between {} and {} shutdown due to a local error: {}", new Object[]{localBroker, remoteBroker, error});
876            LOG.debug("The local Exception was: {}", error, error);
877
878            brokerService.getTaskRunnerFactory().execute(new Runnable() {
879                @Override
880                public void run() {
881                    ServiceSupport.dispose(getControllingService());
882                }
883            });
884            fireBridgeFailed(error);
885        }
886    }
887
888    private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
889        if (configuration.isAdvisoryForFailedForward()) {
890            AdvisoryBroker advisoryBroker = null;
891            try {
892                advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
893
894                if (advisoryBroker != null) {
895                    ConnectionContext context = new ConnectionContext();
896                    context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
897                    context.setBroker(brokerService.getBroker());
898
899                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
900                    advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
901                    advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
902                            advisoryMessage);
903
904                }
905            } catch (Exception e) {
906                LOG.warn("failed to fire forward failure advisory, cause: {}", e);
907                LOG.debug("detail", e);
908            }
909        }
910    }
911
912    protected Service getControllingService() {
913        return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
914    }
915
916    protected void addSubscription(DemandSubscription sub) throws IOException {
917        if (sub != null) {
918            if (isDuplex()) {
919                // async vm transport, need to wait for completion
920                localBroker.request(sub.getLocalInfo());
921            } else {
922                localBroker.oneway(sub.getLocalInfo());
923            }
924        }
925    }
926
927    protected void removeSubscription(final DemandSubscription sub) throws IOException {
928        if (sub != null) {
929            LOG.trace("{} remove local subscription: {} for remote {}", new Object[]{configuration.getBrokerName(), sub.getLocalInfo().getConsumerId(), sub.getRemoteInfo().getConsumerId()});
930
931            // ensure not available for conduit subs pending removal
932            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
933            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
934
935            // continue removal in separate thread to free up this thread for outstanding responses
936            // Serialize with removeDestination operations so that removeSubs are serialized with
937            // removeDestinations such that all removeSub advisories are generated
938            serialExecutor.execute(new Runnable() {
939                @Override
940                public void run() {
941                    sub.waitForCompletion();
942                    try {
943                        localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
944                    } catch (IOException e) {
945                        LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
946                    }
947                }
948            });
949        }
950    }
951
952    protected Message configureMessage(MessageDispatch md) throws IOException {
953        Message message = md.getMessage().copy();
954        // Update the packet to show where it came from.
955        message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
956        message.setProducerId(producerInfo.getProducerId());
957        message.setDestination(md.getDestination());
958        message.setMemoryUsage(null);
959        if (message.getOriginalTransactionId() == null) {
960            message.setOriginalTransactionId(message.getTransactionId());
961        }
962        message.setTransactionId(null);
963        if (configuration.isUseCompression()) {
964            message.compress();
965        }
966        return message;
967    }
968
969    protected void serviceLocalCommand(Command command) {
970        if (!disposed.get()) {
971            try {
972                if (command.isMessageDispatch()) {
973                    safeWaitUntilStarted();
974                    networkBridgeStatistics.getEnqueues().increment();
975                    final MessageDispatch md = (MessageDispatch) command;
976                    final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
977                    if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
978
979                        if (suppressMessageDispatch(md, sub)) {
980                            LOG.debug("{} message not forwarded to {} because message came from there or fails TTL, brokerPath: {}, message: {}", new Object[]{
981                                    configuration.getBrokerName(), remoteBrokerName, Arrays.toString(md.getMessage().getBrokerPath()), md.getMessage()
982                            });
983                            // still ack as it may be durable
984                            try {
985                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
986                            } finally {
987                                sub.decrementOutstandingResponses();
988                            }
989                            return;
990                        }
991
992                        Message message = configureMessage(md);
993                        LOG.debug("bridging ({} -> {}), consumer: {}, destination: {}, brokerPath: {}, message: {}", new Object[]{
994                                configuration.getBrokerName(), remoteBrokerName, (LOG.isTraceEnabled() ? message : message.getMessageId()), md.getConsumerId(), message.getDestination(), Arrays.toString(message.getBrokerPath()), message
995                        });
996
997                        if (isDuplex() && NetworkBridgeFilter.isAdvisoryInterpretedByNetworkBridge(message)) {
998                            try {
999                                // never request b/c they are eventually acked async
1000                                remoteBroker.oneway(message);
1001                            } finally {
1002                                sub.decrementOutstandingResponses();
1003                            }
1004                            return;
1005                        }
1006
1007                        if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
1008
1009                            // The message was not sent using async send, so we should only
1010                            // ack the local broker when we get confirmation that the remote
1011                            // broker has received the message.
1012                            remoteBroker.asyncRequest(message, new ResponseCallback() {
1013                                @Override
1014                                public void onCompletion(FutureResponse future) {
1015                                    try {
1016                                        Response response = future.getResult();
1017                                        if (response.isException()) {
1018                                            ExceptionResponse er = (ExceptionResponse) response;
1019                                            serviceLocalException(md, er.getException());
1020                                        } else {
1021                                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1022                                            networkBridgeStatistics.getDequeues().increment();
1023                                        }
1024                                    } catch (IOException e) {
1025                                        serviceLocalException(md, e);
1026                                    } finally {
1027                                        sub.decrementOutstandingResponses();
1028                                    }
1029                                }
1030                            });
1031
1032                        } else {
1033                            // If the message was originally sent using async send, we will
1034                            // preserve that QOS by bridging it using an async send (small chance
1035                            // of message loss).
1036                            try {
1037                                remoteBroker.oneway(message);
1038                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1039                                networkBridgeStatistics.getDequeues().increment();
1040                            } finally {
1041                                sub.decrementOutstandingResponses();
1042                            }
1043                        }
1044                        serviceOutbound(message);
1045                    } else {
1046                        LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
1047                    }
1048                } else if (command.isBrokerInfo()) {
1049                    futureLocalBrokerInfo.set((BrokerInfo) command);
1050                } else if (command.isShutdownInfo()) {
1051                    LOG.info("{} Shutting down {}", configuration.getBrokerName(), configuration.getName());
1052                    stop();
1053                } else if (command.getClass() == ConnectionError.class) {
1054                    ConnectionError ce = (ConnectionError) command;
1055                    serviceLocalException(ce.getException());
1056                } else {
1057                    switch (command.getDataStructureType()) {
1058                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
1059                            break;
1060                        default:
1061                            LOG.warn("Unexpected local command: {}", command);
1062                    }
1063                }
1064            } catch (Throwable e) {
1065                LOG.warn("Caught an exception processing local command", e);
1066                serviceLocalException(e);
1067            }
1068        }
1069    }
1070
1071    private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
1072        boolean suppress = false;
1073        // for durable subs, suppression via filter leaves dangling acks so we
1074        // need to check here and allow the ack irrespective
1075        if (sub.getLocalInfo().isDurable()) {
1076            MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
1077            messageEvalContext.setMessageReference(md.getMessage());
1078            messageEvalContext.setDestination(md.getDestination());
1079            suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
1080        }
1081        return suppress;
1082    }
1083
1084    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
1085        if (brokerPath != null) {
1086            for (BrokerId id : brokerPath) {
1087                if (brokerId.equals(id)) {
1088                    return true;
1089                }
1090            }
1091        }
1092        return false;
1093    }
1094
1095    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
1096        if (brokerPath == null || brokerPath.length == 0) {
1097            return pathsToAppend;
1098        }
1099        BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
1100        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1101        System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
1102        return rc;
1103    }
1104
1105    protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
1106        if (brokerPath == null || brokerPath.length == 0) {
1107            return new BrokerId[]{idToAppend};
1108        }
1109        BrokerId rc[] = new BrokerId[brokerPath.length + 1];
1110        System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1111        rc[brokerPath.length] = idToAppend;
1112        return rc;
1113    }
1114
1115    protected boolean isPermissableDestination(ActiveMQDestination destination) {
1116        return isPermissableDestination(destination, false);
1117    }
1118
1119    protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
1120        // Are we not bridging temporary destinations?
1121        if (destination.isTemporary()) {
1122            if (allowTemporary) {
1123                return true;
1124            } else {
1125                return configuration.isBridgeTempDestinations();
1126            }
1127        }
1128
1129        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1130        if (dests != null && dests.length > 0) {
1131            for (ActiveMQDestination dest : dests) {
1132                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1133                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1134                    return true;
1135                }
1136            }
1137        }
1138
1139        dests = excludedDestinations;
1140        if (dests != null && dests.length > 0) {
1141            for (ActiveMQDestination dest : dests) {
1142                DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
1143                if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1144                    return false;
1145                }
1146            }
1147        }
1148
1149        dests = dynamicallyIncludedDestinations;
1150        if (dests != null && dests.length > 0) {
1151            for (ActiveMQDestination dest : dests) {
1152                DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1153                if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1154                    return true;
1155                }
1156            }
1157
1158            return false;
1159        }
1160        return true;
1161    }
1162
1163    /**
1164     * Subscriptions for these destinations are always created
1165     */
1166    protected void setupStaticDestinations() {
1167        ActiveMQDestination[] dests = staticallyIncludedDestinations;
1168        if (dests != null) {
1169            for (ActiveMQDestination dest : dests) {
1170                DemandSubscription sub = createDemandSubscription(dest);
1171                sub.setStaticallyIncluded(true);
1172                try {
1173                    addSubscription(sub);
1174                } catch (IOException e) {
1175                    LOG.error("Failed to add static destination {}", dest, e);
1176                }
1177                LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
1178            }
1179        }
1180    }
1181
1182    protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1183        ConsumerInfo info = consumerInfo.copy();
1184        addRemoteBrokerToBrokerPath(info);
1185        DemandSubscription sub = createDemandSubscription(info);
1186        if (sub != null) {
1187            if (duplicateSuppressionIsRequired(sub)) {
1188                undoMapRegistration(sub);
1189            } else {
1190                if (consumerInfo.isDurable()) {
1191                    sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
1192                }
1193                addSubscription(sub);
1194                LOG.debug("{} new demand subscription: {}", configuration.getBrokerName(), sub);
1195            }
1196        }
1197    }
1198
1199    private void undoMapRegistration(DemandSubscription sub) {
1200        subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1201        subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1202    }
1203
1204    /*
1205     * check our existing subs networkConsumerIds against the list of network
1206     * ids in this subscription A match means a duplicate which we suppress for
1207     * topics and maybe for queues
1208     */
1209    private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1210        final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1211        boolean suppress = false;
1212
1213        if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic()
1214                && !configuration.isSuppressDuplicateTopicSubscriptions()) {
1215            return suppress;
1216        }
1217
1218        List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1219        Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
1220        for (Subscription sub : currentSubs) {
1221            List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1222            if (!networkConsumers.isEmpty()) {
1223                if (matchFound(candidateConsumers, networkConsumers)) {
1224                    if (isInActiveDurableSub(sub)) {
1225                        suppress = false;
1226                    } else {
1227                        suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1228                    }
1229                    break;
1230                }
1231            }
1232        }
1233        return suppress;
1234    }
1235
1236    private boolean isInActiveDurableSub(Subscription sub) {
1237        return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
1238    }
1239
1240    private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1241        boolean suppress = false;
1242
1243        if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1244            LOG.debug("{} Ignoring duplicate subscription from {}, sub: {} is duplicate by network subscription with equal or higher network priority: {}, networkConsumerIds: {}", new Object[]{
1245                    configuration.getBrokerName(), remoteBrokerName, candidateInfo, existingSub, existingSub.getConsumerInfo().getNetworkConsumerIds()
1246            });
1247            suppress = true;
1248        } else {
1249            // remove the existing lower priority duplicate and allow this candidate
1250            try {
1251                removeDuplicateSubscription(existingSub);
1252
1253                LOG.debug("{} Replacing duplicate subscription {} with sub from {}, which has a higher priority, new sub: {}, networkConsumerIds: {}", new Object[]{
1254                        configuration.getBrokerName(), existingSub.getConsumerInfo(), remoteBrokerName, candidateInfo, candidateInfo.getNetworkConsumerIds()
1255                });
1256            } catch (IOException e) {
1257                LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: {}", existingSub, e);
1258            }
1259        }
1260        return suppress;
1261    }
1262
1263    private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1264        for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1265            if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1266                break;
1267            }
1268        }
1269    }
1270
1271    private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1272        boolean found = false;
1273        for (ConsumerId aliasConsumer : networkConsumers) {
1274            if (candidateConsumers.contains(aliasConsumer)) {
1275                found = true;
1276                break;
1277            }
1278        }
1279        return found;
1280    }
1281
1282    protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1283        RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1284        Region region;
1285        Collection<Subscription> subs;
1286
1287        region = null;
1288        switch (dest.getDestinationType()) {
1289            case ActiveMQDestination.QUEUE_TYPE:
1290                region = region_broker.getQueueRegion();
1291                break;
1292            case ActiveMQDestination.TOPIC_TYPE:
1293                region = region_broker.getTopicRegion();
1294                break;
1295            case ActiveMQDestination.TEMP_QUEUE_TYPE:
1296                region = region_broker.getTempQueueRegion();
1297                break;
1298            case ActiveMQDestination.TEMP_TOPIC_TYPE:
1299                region = region_broker.getTempTopicRegion();
1300                break;
1301        }
1302
1303        if (region instanceof AbstractRegion) {
1304            subs = ((AbstractRegion) region).getSubscriptions().values();
1305        } else {
1306            subs = null;
1307        }
1308
1309        return subs;
1310    }
1311
1312    protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1313        // add our original id to ourselves
1314        info.addNetworkConsumerId(info.getConsumerId());
1315        return doCreateDemandSubscription(info);
1316    }
1317
1318    protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1319        DemandSubscription result = new DemandSubscription(info);
1320        result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1321        if (info.getDestination().isTemporary()) {
1322            // reset the local connection Id
1323            ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1324            dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1325        }
1326
1327        if (configuration.isDecreaseNetworkConsumerPriority()) {
1328            byte priority = (byte) configuration.getConsumerPriorityBase();
1329            if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1330                // The longer the path to the consumer, the less it's consumer priority.
1331                priority -= info.getBrokerPath().length + 1;
1332            }
1333            result.getLocalInfo().setPriority(priority);
1334            LOG.debug("{} using priority: {} for subscription: {}", new Object[]{configuration.getBrokerName(), priority, info});
1335        }
1336        configureDemandSubscription(info, result);
1337        return result;
1338    }
1339
1340    final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1341        ConsumerInfo info = new ConsumerInfo();
1342        info.setNetworkSubscription(true);
1343        info.setDestination(destination);
1344
1345        // Indicate that this subscription is being made on behalf of the remote broker.
1346        info.setBrokerPath(new BrokerId[]{remoteBrokerId});
1347
1348        // the remote info held by the DemandSubscription holds the original
1349        // consumerId, the local info get's overwritten
1350        info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1351        DemandSubscription result = null;
1352        try {
1353            result = createDemandSubscription(info);
1354        } catch (IOException e) {
1355            LOG.error("Failed to create DemandSubscription ", e);
1356        }
1357        return result;
1358    }
1359
1360    protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1361        if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) ||
1362                AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) {
1363            sub.getLocalInfo().setDispatchAsync(true);
1364        } else {
1365            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1366        }
1367        sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1368        subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1369        subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1370
1371        sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1372        if (!info.isDurable()) {
1373            // This works for now since we use a VM connection to the local broker.
1374            // may need to change if we ever subscribe to a remote broker.
1375            sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1376        } else {
1377            sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
1378        }
1379    }
1380
1381    protected void removeDemandSubscription(ConsumerId id) throws IOException {
1382        DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1383        LOG.debug("{} remove request on {} from {}, consumer id: {}, matching sub: {}", new Object[]{
1384                configuration.getBrokerName(), localBroker, remoteBrokerName, id, sub
1385        });
1386        if (sub != null) {
1387            removeSubscription(sub);
1388            LOG.debug("{} removed sub on {} from {}: {}", new Object[]{
1389                    configuration.getBrokerName(), localBroker, remoteBrokerName, sub.getRemoteInfo()
1390            });
1391        }
1392    }
1393
1394    protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1395        boolean removeDone = false;
1396        DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1397        if (sub != null) {
1398            try {
1399                removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1400                removeDone = true;
1401            } catch (IOException e) {
1402                LOG.debug("removeDemandSubscriptionByLocalId failed for localId: {}", consumerId, e);
1403            }
1404        }
1405        return removeDone;
1406    }
1407
1408    /**
1409     * Performs a timed wait on the started latch and then checks for disposed
1410     * before performing another wait each time the the started wait times out.
1411     */
1412    protected boolean safeWaitUntilStarted() throws InterruptedException {
1413        while (!disposed.get()) {
1414            if (startedLatch.await(1, TimeUnit.SECONDS)) {
1415                break;
1416            }
1417        }
1418        return !disposed.get();
1419    }
1420
1421    protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1422        NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1423        if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1424            PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1425            if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1426                filterFactory = entry.getNetworkBridgeFilterFactory();
1427            }
1428        }
1429        return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL());
1430    }
1431
1432    protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1433        info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1434    }
1435
1436    protected BrokerId[] getRemoteBrokerPath() {
1437        return remoteBrokerPath;
1438    }
1439
1440    @Override
1441    public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1442        this.networkBridgeListener = listener;
1443    }
1444
1445    private void fireBridgeFailed(Throwable reason) {
1446        LOG.trace("fire bridge failed, listener: {}", this.networkBridgeListener, reason);
1447        NetworkBridgeListener l = this.networkBridgeListener;
1448        if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
1449            l.bridgeFailed();
1450        }
1451    }
1452
1453    /**
1454     * @return Returns the dynamicallyIncludedDestinations.
1455     */
1456    public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
1457        return dynamicallyIncludedDestinations;
1458    }
1459
1460    /**
1461     * @param dynamicallyIncludedDestinations
1462     *         The dynamicallyIncludedDestinations to set.
1463     */
1464    public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
1465        this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
1466    }
1467
1468    /**
1469     * @return Returns the excludedDestinations.
1470     */
1471    public ActiveMQDestination[] getExcludedDestinations() {
1472        return excludedDestinations;
1473    }
1474
1475    /**
1476     * @param excludedDestinations The excludedDestinations to set.
1477     */
1478    public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
1479        this.excludedDestinations = excludedDestinations;
1480    }
1481
1482    /**
1483     * @return Returns the staticallyIncludedDestinations.
1484     */
1485    public ActiveMQDestination[] getStaticallyIncludedDestinations() {
1486        return staticallyIncludedDestinations;
1487    }
1488
1489    /**
1490     * @param staticallyIncludedDestinations The staticallyIncludedDestinations to set.
1491     */
1492    public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
1493        this.staticallyIncludedDestinations = staticallyIncludedDestinations;
1494    }
1495
1496    /**
1497     * @return Returns the durableDestinations.
1498     */
1499    public ActiveMQDestination[] getDurableDestinations() {
1500        return durableDestinations;
1501    }
1502
1503    /**
1504     * @param durableDestinations The durableDestinations to set.
1505     */
1506    public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
1507        this.durableDestinations = durableDestinations;
1508    }
1509
1510    /**
1511     * @return Returns the localBroker.
1512     */
1513    public Transport getLocalBroker() {
1514        return localBroker;
1515    }
1516
1517    /**
1518     * @return Returns the remoteBroker.
1519     */
1520    public Transport getRemoteBroker() {
1521        return remoteBroker;
1522    }
1523
1524    /**
1525     * @return the createdByDuplex
1526     */
1527    public boolean isCreatedByDuplex() {
1528        return this.createdByDuplex;
1529    }
1530
1531    /**
1532     * @param createdByDuplex the createdByDuplex to set
1533     */
1534    public void setCreatedByDuplex(boolean createdByDuplex) {
1535        this.createdByDuplex = createdByDuplex;
1536    }
1537
1538    @Override
1539    public String getRemoteAddress() {
1540        return remoteBroker.getRemoteAddress();
1541    }
1542
1543    @Override
1544    public String getLocalAddress() {
1545        return localBroker.getRemoteAddress();
1546    }
1547
1548    @Override
1549    public String getRemoteBrokerName() {
1550        return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1551    }
1552
1553    @Override
1554    public String getRemoteBrokerId() {
1555        return (remoteBrokerInfo == null || remoteBrokerInfo.getBrokerId() == null) ? null : remoteBrokerInfo.getBrokerId().toString();
1556    }
1557
1558    @Override
1559    public String getLocalBrokerName() {
1560        return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1561    }
1562
1563    @Override
1564    public long getDequeueCounter() {
1565        return networkBridgeStatistics.getDequeues().getCount();
1566    }
1567
1568    @Override
1569    public long getEnqueueCounter() {
1570        return networkBridgeStatistics.getEnqueues().getCount();
1571    }
1572
1573    @Override
1574    public NetworkBridgeStatistics getNetworkBridgeStatistics() {
1575        return networkBridgeStatistics;
1576    }
1577
1578    protected boolean isDuplex() {
1579        return configuration.isDuplex() || createdByDuplex;
1580    }
1581
1582    public ConcurrentMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1583        return subscriptionMapByRemoteId;
1584    }
1585
1586    @Override
1587    public void setBrokerService(BrokerService brokerService) {
1588        this.brokerService = brokerService;
1589        this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1590        localBrokerPath[0] = localBrokerId;
1591    }
1592
1593    @Override
1594    public void setMbeanObjectName(ObjectName objectName) {
1595        this.mbeanObjectName = objectName;
1596    }
1597
1598    @Override
1599    public ObjectName getMbeanObjectName() {
1600        return mbeanObjectName;
1601    }
1602
1603    @Override
1604    public void resetStats() {
1605        networkBridgeStatistics.reset();
1606    }
1607
1608    /*
1609     * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
1610     * remote sides of the network bridge.
1611     */
1612    private static class FutureBrokerInfo implements Future<BrokerInfo> {
1613
1614        private final CountDownLatch slot = new CountDownLatch(1);
1615        private final AtomicBoolean disposed;
1616        private volatile BrokerInfo info = null;
1617
1618        public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
1619            this.info = info;
1620            this.disposed = disposed;
1621        }
1622
1623        @Override
1624        public boolean cancel(boolean mayInterruptIfRunning) {
1625            slot.countDown();
1626            return true;
1627        }
1628
1629        @Override
1630        public boolean isCancelled() {
1631            return slot.getCount() == 0 && info == null;
1632        }
1633
1634        @Override
1635        public boolean isDone() {
1636            return info != null;
1637        }
1638
1639        @Override
1640        public BrokerInfo get() throws InterruptedException, ExecutionException {
1641            try {
1642                if (info == null) {
1643                    while (!disposed.get()) {
1644                        if (slot.await(1, TimeUnit.SECONDS)) {
1645                            break;
1646                        }
1647                    }
1648                }
1649                return info;
1650            } catch (InterruptedException e) {
1651                Thread.currentThread().interrupt();
1652                LOG.debug("Operation interrupted: {}", e, e);
1653                throw new InterruptedException("Interrupted.");
1654            }
1655        }
1656
1657        @Override
1658        public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1659            try {
1660                if (info == null) {
1661                    long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
1662
1663                    while (!disposed.get() || System.currentTimeMillis() < deadline) {
1664                        if (slot.await(1, TimeUnit.MILLISECONDS)) {
1665                            break;
1666                        }
1667                    }
1668                    if (info == null) {
1669                        throw new TimeoutException();
1670                    }
1671                }
1672                return info;
1673            } catch (InterruptedException e) {
1674                throw new InterruptedException("Interrupted.");
1675            }
1676        }
1677
1678        public void set(BrokerInfo info) {
1679            this.info = info;
1680            this.slot.countDown();
1681        }
1682    }
1683
1684    protected void serviceOutbound(Message message) {
1685        NetworkBridgeListener l = this.networkBridgeListener;
1686        if (l != null) {
1687            l.onOutboundMessage(this, message);
1688        }
1689    }
1690
1691    protected void serviceInboundMessage(Message message) {
1692        NetworkBridgeListener l = this.networkBridgeListener;
1693        if (l != null) {
1694            l.onInboundMessage(this, message);
1695        }
1696    }
1697
1698    protected boolean canDuplexDispatch(Message message) {
1699        boolean result = true;
1700        if (configuration.isCheckDuplicateMessagesOnDuplex()){
1701            final long producerSequenceId = message.getMessageId().getProducerSequenceId();
1702            //  messages are multiplexed on this producer so we need to query the persistenceAdapter
1703            long lastStoredForMessageProducer = getStoredSequenceIdForMessage(message.getMessageId());
1704            if (producerSequenceId <= lastStoredForMessageProducer) {
1705                result = false;
1706                LOG.debug("suppressing duplicate message send [{}] from network producer with producerSequence [{}] less than last stored: {}", new Object[]{
1707                        (LOG.isTraceEnabled() ? message : message.getMessageId()), producerSequenceId, lastStoredForMessageProducer
1708                });
1709            }
1710        }
1711        return result;
1712    }
1713
1714    protected long getStoredSequenceIdForMessage(MessageId messageId) {
1715        try {
1716            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
1717        } catch (IOException ignored) {
1718            LOG.debug("Failed to determine last producer sequence id for: {}", messageId, ignored);
1719        }
1720        return -1;
1721    }
1722
1723}