001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.io.IOException;
020    import java.security.GeneralSecurityException;
021    import java.security.cert.X509Certificate;
022    import java.util.Arrays;
023    import java.util.Collection;
024    import java.util.List;
025    import java.util.Properties;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.CountDownLatch;
028    import java.util.concurrent.ExecutionException;
029    import java.util.concurrent.ExecutorService;
030    import java.util.concurrent.Executors;
031    import java.util.concurrent.Future;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.TimeoutException;
034    import java.util.concurrent.atomic.AtomicBoolean;
035    import java.util.concurrent.atomic.AtomicLong;
036    
037    import javax.management.ObjectName;
038    
039    import org.apache.activemq.DestinationDoesNotExistException;
040    import org.apache.activemq.Service;
041    import org.apache.activemq.advisory.AdvisoryBroker;
042    import org.apache.activemq.advisory.AdvisorySupport;
043    import org.apache.activemq.broker.BrokerService;
044    import org.apache.activemq.broker.BrokerServiceAware;
045    import org.apache.activemq.broker.ConnectionContext;
046    import org.apache.activemq.broker.TransportConnection;
047    import org.apache.activemq.broker.region.AbstractRegion;
048    import org.apache.activemq.broker.region.DurableTopicSubscription;
049    import org.apache.activemq.broker.region.Region;
050    import org.apache.activemq.broker.region.RegionBroker;
051    import org.apache.activemq.broker.region.Subscription;
052    import org.apache.activemq.broker.region.policy.PolicyEntry;
053    import org.apache.activemq.command.ActiveMQDestination;
054    import org.apache.activemq.command.ActiveMQMessage;
055    import org.apache.activemq.command.ActiveMQTempDestination;
056    import org.apache.activemq.command.ActiveMQTopic;
057    import org.apache.activemq.command.BrokerId;
058    import org.apache.activemq.command.BrokerInfo;
059    import org.apache.activemq.command.Command;
060    import org.apache.activemq.command.ConnectionError;
061    import org.apache.activemq.command.ConnectionId;
062    import org.apache.activemq.command.ConnectionInfo;
063    import org.apache.activemq.command.ConsumerId;
064    import org.apache.activemq.command.ConsumerInfo;
065    import org.apache.activemq.command.DataStructure;
066    import org.apache.activemq.command.DestinationInfo;
067    import org.apache.activemq.command.ExceptionResponse;
068    import org.apache.activemq.command.KeepAliveInfo;
069    import org.apache.activemq.command.Message;
070    import org.apache.activemq.command.MessageAck;
071    import org.apache.activemq.command.MessageDispatch;
072    import org.apache.activemq.command.NetworkBridgeFilter;
073    import org.apache.activemq.command.ProducerInfo;
074    import org.apache.activemq.command.RemoveInfo;
075    import org.apache.activemq.command.Response;
076    import org.apache.activemq.command.SessionInfo;
077    import org.apache.activemq.command.ShutdownInfo;
078    import org.apache.activemq.command.WireFormatInfo;
079    import org.apache.activemq.filter.DestinationFilter;
080    import org.apache.activemq.filter.MessageEvaluationContext;
081    import org.apache.activemq.security.SecurityContext;
082    import org.apache.activemq.transport.DefaultTransportListener;
083    import org.apache.activemq.transport.FutureResponse;
084    import org.apache.activemq.transport.ResponseCallback;
085    import org.apache.activemq.transport.Transport;
086    import org.apache.activemq.transport.TransportDisposedIOException;
087    import org.apache.activemq.transport.TransportFilter;
088    import org.apache.activemq.transport.tcp.SslTransport;
089    import org.apache.activemq.util.IdGenerator;
090    import org.apache.activemq.util.IntrospectionSupport;
091    import org.apache.activemq.util.LongSequenceGenerator;
092    import org.apache.activemq.util.MarshallingSupport;
093    import org.apache.activemq.util.ServiceStopper;
094    import org.apache.activemq.util.ServiceSupport;
095    import org.slf4j.Logger;
096    import org.slf4j.LoggerFactory;
097    
098    /**
099     * A useful base class for implementing demand forwarding bridges.
100     */
101    public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
102        private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
103        protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
104        protected final Transport localBroker;
105        protected final Transport remoteBroker;
106        protected final IdGenerator idGenerator = new IdGenerator();
107        protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
108        protected ConnectionInfo localConnectionInfo;
109        protected ConnectionInfo remoteConnectionInfo;
110        protected SessionInfo localSessionInfo;
111        protected ProducerInfo producerInfo;
112        protected String remoteBrokerName = "Unknown";
113        protected String localClientId;
114        protected ConsumerInfo demandConsumerInfo;
115        protected int demandConsumerDispatched;
116        protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
117        protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
118        protected final AtomicBoolean bridgeFailed = new AtomicBoolean();
119        protected final AtomicBoolean disposed = new AtomicBoolean();
120        protected BrokerId localBrokerId;
121        protected ActiveMQDestination[] excludedDestinations;
122        protected ActiveMQDestination[] dynamicallyIncludedDestinations;
123        protected ActiveMQDestination[] staticallyIncludedDestinations;
124        protected ActiveMQDestination[] durableDestinations;
125        protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
126        protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
127        protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
128        protected final CountDownLatch startedLatch = new CountDownLatch(2);
129        protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
130        protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
131        protected NetworkBridgeConfiguration configuration;
132        protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
133    
134        protected final BrokerId remoteBrokerPath[] = new BrokerId[] { null };
135        protected BrokerId remoteBrokerId;
136    
137        final AtomicLong enqueueCounter = new AtomicLong();
138        final AtomicLong dequeueCounter = new AtomicLong();
139    
140        private NetworkBridgeListener networkBridgeListener;
141        private boolean createdByDuplex;
142        private BrokerInfo localBrokerInfo;
143        private BrokerInfo remoteBrokerInfo;
144    
145        private final FutureBrokerInfo futureRemoteBrokerInfo = new FutureBrokerInfo(remoteBrokerInfo, disposed);
146        private final FutureBrokerInfo futureLocalBrokerInfo = new FutureBrokerInfo(localBrokerInfo, disposed);
147    
148        private final AtomicBoolean started = new AtomicBoolean();
149        private TransportConnection duplexInitiatingConnection;
150        private BrokerService brokerService = null;
151        private ObjectName mbeanObjectName;
152        private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
153        private Transport duplexInboundLocalBroker = null;
154        private ProducerInfo duplexInboundLocalProducerInfo;
155    
156        public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
157            this.configuration = configuration;
158            this.localBroker = localBroker;
159            this.remoteBroker = remoteBroker;
160        }
161    
162        public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
163            this.localBrokerInfo = localBrokerInfo;
164            this.remoteBrokerInfo = remoteBrokerInfo;
165            this.duplexInitiatingConnection = connection;
166            start();
167            serviceRemoteCommand(remoteBrokerInfo);
168        }
169    
170        @Override
171        public void start() throws Exception {
172            if (started.compareAndSet(false, true)) {
173    
174                if (brokerService == null) {
175                    throw new IllegalArgumentException("BrokerService is null on " + this);
176                }
177    
178                if (isDuplex()) {
179                    duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
180                    duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
181    
182                        @Override
183                        public void onCommand(Object o) {
184                            Command command = (Command) o;
185                            serviceLocalCommand(command);
186                        }
187    
188                        @Override
189                        public void onException(IOException error) {
190                            serviceLocalException(error);
191                        }
192                    });
193                    duplexInboundLocalBroker.start();
194                }
195    
196                localBroker.setTransportListener(new DefaultTransportListener() {
197    
198                    @Override
199                    public void onCommand(Object o) {
200                        Command command = (Command) o;
201                        serviceLocalCommand(command);
202                    }
203    
204                    @Override
205                    public void onException(IOException error) {
206                        if (!futureLocalBrokerInfo.isDone()) {
207                            futureLocalBrokerInfo.cancel(true);
208                            return;
209                        }
210                        serviceLocalException(error);
211                    }
212                });
213    
214                remoteBroker.setTransportListener(new DefaultTransportListener() {
215    
216                    @Override
217                    public void onCommand(Object o) {
218                        Command command = (Command) o;
219                        serviceRemoteCommand(command);
220                    }
221    
222                    @Override
223                    public void onException(IOException error) {
224                        if (!futureRemoteBrokerInfo.isDone()) {
225                            futureRemoteBrokerInfo.cancel(true);
226                            return;
227                        }
228                        serviceRemoteException(error);
229                    }
230                });
231    
232                remoteBroker.start();
233                localBroker.start();
234    
235                if (!disposed.get()) {
236                    try {
237                        triggerStartAsyncNetworkBridgeCreation();
238                    } catch (IOException e) {
239                        LOG.warn("Caught exception from remote start", e);
240                    }
241                } else {
242                    LOG.warn("Bridge was disposed before the start() method was fully executed.");
243                    throw new TransportDisposedIOException();
244                }
245            }
246        }
247    
248        @Override
249        public void stop() throws Exception {
250            if (started.compareAndSet(true, false)) {
251                if (disposed.compareAndSet(false, true)) {
252                    if (LOG.isDebugEnabled()) {
253                        LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
254                    }
255    
256                    futureRemoteBrokerInfo.cancel(true);
257                    futureLocalBrokerInfo.cancel(true);
258    
259                    NetworkBridgeListener l = this.networkBridgeListener;
260                    if (l != null) {
261                        l.onStop(this);
262                    }
263                    try {
264                        // local start complete
265                        if (startedLatch.getCount() < 2) {
266                            if (LOG.isTraceEnabled()) {
267                                LOG.trace(configuration.getBrokerName() + " unregister bridge (" + this + ") to " + remoteBrokerName);
268                            }
269                            brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
270                            brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
271                        }
272    
273                        remoteBridgeStarted.set(false);
274                        final CountDownLatch sendShutdown = new CountDownLatch(1);
275    
276                        brokerService.getTaskRunnerFactory().execute(new Runnable() {
277                            @Override
278                            public void run() {
279                                try {
280                                    serialExecutor.shutdown();
281                                    if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
282                                        List<Runnable> pendingTasks = serialExecutor.shutdownNow();
283                                        if (LOG.isInfoEnabled()) {
284                                            LOG.info("pending tasks on stop" + pendingTasks);
285                                        }
286                                    }
287                                    localBroker.oneway(new ShutdownInfo());
288                                    remoteBroker.oneway(new ShutdownInfo());
289                                } catch (Throwable e) {
290                                    if (LOG.isDebugEnabled()) {
291                                        LOG.debug("Caught exception sending shutdown", e);
292                                    }
293                                } finally {
294                                    sendShutdown.countDown();
295                                }
296    
297                            }
298                        }, "ActiveMQ ForwardingBridge StopTask");
299    
300                        if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
301                            LOG.info("Network Could not shutdown in a timely manner");
302                        }
303                    } finally {
304                        ServiceStopper ss = new ServiceStopper();
305                        ss.stop(remoteBroker);
306                        ss.stop(localBroker);
307                        ss.stop(duplexInboundLocalBroker);
308                        // Release the started Latch since another thread could be
309                        // stuck waiting for it to start up.
310                        startedLatch.countDown();
311                        startedLatch.countDown();
312                        localStartedLatch.countDown();
313    
314                        ss.throwFirstException();
315                    }
316                }
317    
318                if (LOG.isInfoEnabled()) {
319                    LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
320                }
321            }
322        }
323    
324        protected void triggerStartAsyncNetworkBridgeCreation() throws IOException {
325            brokerService.getTaskRunnerFactory().execute(new Runnable() {
326                @Override
327                public void run() {
328                    final String originalName = Thread.currentThread().getName();
329                    Thread.currentThread().setName("triggerStartAsyncNetworkBridgeCreation: " +
330                        "remoteBroker=" + remoteBroker + ", localBroker= " + localBroker);
331    
332                    try {
333                        // First we collect the info data from both the local and remote ends
334                        collectBrokerInfos();
335    
336                        // Once we have all required broker info we can attempt to start
337                        // the local and then remote sides of the bridge.
338                        doStartLocalAndRemoteBridges();
339                    } finally {
340                        Thread.currentThread().setName(originalName);
341                    }
342                }
343            });
344        }
345    
346        private void collectBrokerInfos() {
347    
348            // First wait for the remote to feed us its BrokerInfo, then we can check on
349            // the LocalBrokerInfo and decide is this is a loop.
350            try {
351                remoteBrokerInfo = futureRemoteBrokerInfo.get();
352                if (remoteBrokerInfo == null) {
353                    fireBridgeFailed();
354                }
355            } catch (Exception e) {
356                serviceRemoteException(e);
357                return;
358            }
359    
360            try {
361                localBrokerInfo = futureLocalBrokerInfo.get();
362                if (localBrokerInfo == null) {
363                    fireBridgeFailed();
364                }
365    
366                // Before we try and build the bridge lets check if we are in a loop
367                // and if so just stop now before registering anything.
368                if (localBrokerId.equals(remoteBrokerId)) {
369                     if (LOG.isTraceEnabled()) {
370                         LOG.trace(configuration.getBrokerName() +
371                             " disconnecting remote loop back connection for: " +
372                             remoteBrokerName + ", with id:" + remoteBrokerId);
373                     }
374                     ServiceSupport.dispose(localBroker);
375                     ServiceSupport.dispose(remoteBroker);
376                     return;
377                }
378    
379                // Fill in the remote broker's information now.
380                remoteBrokerId = remoteBrokerInfo.getBrokerId();
381                remoteBrokerPath[0] = remoteBrokerId;
382                remoteBrokerName = remoteBrokerInfo.getBrokerName();
383            } catch (Throwable e) {
384                serviceLocalException(e);
385            }
386        }
387    
388        private void doStartLocalAndRemoteBridges() {
389    
390            if (disposed.get()) {
391                return;
392            }
393    
394            if (isCreatedByDuplex()) {
395                // apply remote (propagated) configuration to local duplex bridge before start
396                Properties props = null;
397                try {
398                    props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
399                    IntrospectionSupport.getProperties(configuration, props, null);
400                    if (configuration.getExcludedDestinations() != null) {
401                        excludedDestinations = configuration.getExcludedDestinations().toArray(
402                                new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
403                    }
404                    if (configuration.getStaticallyIncludedDestinations() != null) {
405                        staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
406                                new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
407                    }
408                    if (configuration.getDynamicallyIncludedDestinations() != null) {
409                        dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations().toArray(
410                                new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations().size()]);
411                    }
412                } catch (Throwable t) {
413                    LOG.error("Error mapping remote configuration: " + props, t);
414                }
415            }
416    
417            try {
418                startLocalBridge();
419            } catch (Throwable e) {
420                serviceLocalException(e);
421                return;
422            }
423    
424            try {
425                startRemoteBridge();
426            } catch (Throwable e) {
427                serviceRemoteException(e);
428            }
429        }
430    
431        private void startLocalBridge() throws Throwable {
432            if (localBridgeStarted.compareAndSet(false, true)) {
433                synchronized (this) {
434                    if (LOG.isTraceEnabled()) {
435                        LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker);
436                    }
437                    if (!disposed.get()) {
438                        localConnectionInfo = new ConnectionInfo();
439                        localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
440                        localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
441                        localConnectionInfo.setClientId(localClientId);
442                        localConnectionInfo.setUserName(configuration.getUserName());
443                        localConnectionInfo.setPassword(configuration.getPassword());
444                        Transport originalTransport = remoteBroker;
445                        while (originalTransport instanceof TransportFilter) {
446                            originalTransport = ((TransportFilter) originalTransport).getNext();
447                        }
448                        if (originalTransport instanceof SslTransport) {
449                            X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
450                            localConnectionInfo.setTransportContext(peerCerts);
451                        }
452                        // sync requests that may fail
453                        Object resp = localBroker.request(localConnectionInfo);
454                        if (resp instanceof ExceptionResponse) {
455                            throw ((ExceptionResponse) resp).getException();
456                        }
457                        localSessionInfo = new SessionInfo(localConnectionInfo, 1);
458                        localBroker.oneway(localSessionInfo);
459    
460                        if (configuration.isDuplex()) {
461                            // separate in-bound channel for forwards so we don't
462                            // contend with out-bound dispatch on same connection
463                            ConnectionInfo duplexLocalConnectionInfo = new ConnectionInfo();
464                            duplexLocalConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
465                            duplexLocalConnectionInfo.setClientId(configuration.getName() + "_" + remoteBrokerName + "_inbound_duplex_"
466                                + configuration.getBrokerName());
467                            duplexLocalConnectionInfo.setUserName(configuration.getUserName());
468                            duplexLocalConnectionInfo.setPassword(configuration.getPassword());
469    
470                            if (originalTransport instanceof SslTransport) {
471                                X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
472                                duplexLocalConnectionInfo.setTransportContext(peerCerts);
473                            }
474                            // sync requests that may fail
475                            resp = duplexInboundLocalBroker.request(duplexLocalConnectionInfo);
476                            if (resp instanceof ExceptionResponse) {
477                                throw ((ExceptionResponse) resp).getException();
478                            }
479                            SessionInfo duplexInboundSession = new SessionInfo(duplexLocalConnectionInfo, 1);
480                            duplexInboundLocalProducerInfo = new ProducerInfo(duplexInboundSession, 1);
481                            duplexInboundLocalBroker.oneway(duplexInboundSession);
482                            duplexInboundLocalBroker.oneway(duplexInboundLocalProducerInfo);
483                        }
484                        brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
485                        NetworkBridgeListener l = this.networkBridgeListener;
486                        if (l != null) {
487                            l.onStart(this);
488                        }
489    
490                        // Let the local broker know the remote broker's ID.
491                        localBroker.oneway(remoteBrokerInfo);
492                        // new peer broker (a consumer can work with remote broker also)
493                        brokerService.getBroker().addBroker(null, remoteBrokerInfo);
494    
495                        if (LOG.isInfoEnabled()) {
496                            LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
497                            if (LOG.isTraceEnabled()) {
498                                LOG.trace(configuration.getBrokerName() + " register bridge (" + this + ") to " + remoteBrokerName);
499                            }
500                        }
501                    } else {
502                        LOG.warn("Bridge was disposed before the startLocalBridge() method was fully executed.");
503                    }
504                    startedLatch.countDown();
505                    localStartedLatch.countDown();
506                }
507    
508                if (!disposed.get()) {
509                    setupStaticDestinations();
510                } else {
511                    LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName
512                        + ") was interrupted during establishment.");
513                }
514            }
515        }
516    
517        protected void startRemoteBridge() throws Exception {
518            if (remoteBridgeStarted.compareAndSet(false, true)) {
519                if (LOG.isTraceEnabled()) {
520                    LOG.trace(configuration.getBrokerName() + " starting remote Bridge, remoteBroker=" + remoteBroker);
521                }
522                synchronized (this) {
523                    if (!isCreatedByDuplex()) {
524                        BrokerInfo brokerInfo = new BrokerInfo();
525                        brokerInfo.setBrokerName(configuration.getBrokerName());
526                        brokerInfo.setBrokerURL(configuration.getBrokerURL());
527                        brokerInfo.setNetworkConnection(true);
528                        brokerInfo.setDuplexConnection(configuration.isDuplex());
529                        // set our properties
530                        Properties props = new Properties();
531                        IntrospectionSupport.getProperties(configuration, props, null);
532                        String str = MarshallingSupport.propertiesToString(props);
533                        brokerInfo.setNetworkProperties(str);
534                        brokerInfo.setBrokerId(this.localBrokerId);
535                        remoteBroker.oneway(brokerInfo);
536                    }
537                    if (remoteConnectionInfo != null) {
538                        remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
539                    }
540                    remoteConnectionInfo = new ConnectionInfo();
541                    remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
542                    remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
543                    remoteConnectionInfo.setUserName(configuration.getUserName());
544                    remoteConnectionInfo.setPassword(configuration.getPassword());
545                    remoteBroker.oneway(remoteConnectionInfo);
546    
547                    SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
548                    remoteBroker.oneway(remoteSessionInfo);
549                    producerInfo = new ProducerInfo(remoteSessionInfo, 1);
550                    producerInfo.setResponseRequired(false);
551                    remoteBroker.oneway(producerInfo);
552                    // Listen to consumer advisory messages on the remote broker to determine demand.
553                    if (!configuration.isStaticBridge()) {
554                        demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
555                        // always dispatch advisory message asynchronously so that
556                        // we never block the producer broker if we are slow
557                        demandConsumerInfo.setDispatchAsync(true);
558                        String advisoryTopic = configuration.getDestinationFilter();
559                        if (configuration.isBridgeTempDestinations()) {
560                            advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
561                        }
562                        demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
563                        demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
564                        remoteBroker.oneway(demandConsumerInfo);
565                    }
566                    startedLatch.countDown();
567                }
568            }
569        }
570    
571        @Override
572        public void serviceRemoteException(Throwable error) {
573            if (!disposed.get()) {
574                if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
575                    LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
576                } else {
577                    LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
578                }
579                LOG.debug("The remote Exception was: " + error, error);
580                brokerService.getTaskRunnerFactory().execute(new Runnable() {
581                    @Override
582                    public void run() {
583                        ServiceSupport.dispose(getControllingService());
584                    }
585                });
586                fireBridgeFailed();
587            }
588        }
589    
590        protected void serviceRemoteCommand(Command command) {
591            if (!disposed.get()) {
592                try {
593                    if (command.isMessageDispatch()) {
594                        safeWaitUntilStarted();
595                        MessageDispatch md = (MessageDispatch) command;
596                        serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
597                        ackAdvisory(md.getMessage());
598                    } else if (command.isBrokerInfo()) {
599                        futureRemoteBrokerInfo.set((BrokerInfo) command);
600                    } else if (command.getClass() == ConnectionError.class) {
601                        ConnectionError ce = (ConnectionError) command;
602                        serviceRemoteException(ce.getException());
603                    } else {
604                        if (isDuplex()) {
605                            if (LOG.isTraceEnabled()) {
606                                LOG.trace(configuration.getBrokerName() + " duplex command type: " + command.getDataStructureType());
607                            }
608                            if (command.isMessage()) {
609                                final ActiveMQMessage message = (ActiveMQMessage) command;
610                                if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
611                                    || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
612                                    serviceRemoteConsumerAdvisory(message.getDataStructure());
613                                    ackAdvisory(message);
614                                } else {
615                                    if (!isPermissableDestination(message.getDestination(), true)) {
616                                        return;
617                                    }
618                                    // message being forwarded - we need to
619                                    // propagate the response to our local send
620                                    message.setProducerId(duplexInboundLocalProducerInfo.getProducerId());
621                                    if (message.isResponseRequired() || configuration.isAlwaysSyncSend()) {
622                                        duplexInboundLocalBroker.asyncRequest(message, new ResponseCallback() {
623                                            final int correlationId = message.getCommandId();
624    
625                                            @Override
626                                            public void onCompletion(FutureResponse resp) {
627                                                try {
628                                                    Response reply = resp.getResult();
629                                                    reply.setCorrelationId(correlationId);
630                                                    remoteBroker.oneway(reply);
631                                                } catch (IOException error) {
632                                                    LOG.error("Exception: " + error + " on duplex forward of: " + message);
633                                                    serviceRemoteException(error);
634                                                }
635                                            }
636                                        });
637                                    } else {
638                                        duplexInboundLocalBroker.oneway(message);
639                                    }
640                                }
641                            } else {
642                                switch (command.getDataStructureType()) {
643                                    case ConnectionInfo.DATA_STRUCTURE_TYPE:
644                                    case SessionInfo.DATA_STRUCTURE_TYPE:
645                                        localBroker.oneway(command);
646                                        break;
647                                    case ProducerInfo.DATA_STRUCTURE_TYPE:
648                                        // using duplexInboundLocalProducerInfo
649                                        break;
650                                    case MessageAck.DATA_STRUCTURE_TYPE:
651                                        MessageAck ack = (MessageAck) command;
652                                        DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
653                                        if (localSub != null) {
654                                            ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
655                                            localBroker.oneway(ack);
656                                        } else {
657                                            LOG.warn("Matching local subscription not found for ack: " + ack);
658                                        }
659                                        break;
660                                    case ConsumerInfo.DATA_STRUCTURE_TYPE:
661                                        localStartedLatch.await();
662                                        if (started.get()) {
663                                            if (!addConsumerInfo((ConsumerInfo) command)) {
664                                                if (LOG.isDebugEnabled()) {
665                                                    LOG.debug("Ignoring ConsumerInfo: " + command);
666                                                }
667                                            } else {
668                                                if (LOG.isTraceEnabled()) {
669                                                    LOG.trace("Adding ConsumerInfo: " + command);
670                                                }
671                                            }
672                                        } else {
673                                            // received a subscription whilst stopping
674                                            LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
675                                        }
676                                        break;
677                                    case ShutdownInfo.DATA_STRUCTURE_TYPE:
678                                        // initiator is shutting down, controlled case
679                                        // abortive close dealt with by inactivity monitor
680                                        LOG.info("Stopping network bridge on shutdown of remote broker");
681                                        serviceRemoteException(new IOException(command.toString()));
682                                        break;
683                                    default:
684                                        if (LOG.isDebugEnabled()) {
685                                            LOG.debug("Ignoring remote command: " + command);
686                                        }
687                                }
688                            }
689                        } else {
690                            switch (command.getDataStructureType()) {
691                                case KeepAliveInfo.DATA_STRUCTURE_TYPE:
692                                case WireFormatInfo.DATA_STRUCTURE_TYPE:
693                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
694                                    break;
695                                default:
696                                    LOG.warn("Unexpected remote command: " + command);
697                            }
698                        }
699                    }
700                } catch (Throwable e) {
701                    if (LOG.isDebugEnabled()) {
702                        LOG.debug("Exception processing remote command: " + command, e);
703                    }
704                    serviceRemoteException(e);
705                }
706            }
707        }
708    
709        private void ackAdvisory(Message message) throws IOException {
710            demandConsumerDispatched++;
711            if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
712                MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
713                ack.setConsumerId(demandConsumerInfo.getConsumerId());
714                remoteBroker.oneway(ack);
715                demandConsumerDispatched = 0;
716            }
717        }
718    
719        private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
720            final int networkTTL = configuration.getNetworkTTL();
721            if (data.getClass() == ConsumerInfo.class) {
722                // Create a new local subscription
723                ConsumerInfo info = (ConsumerInfo) data;
724                BrokerId[] path = info.getBrokerPath();
725    
726                if (info.isBrowser()) {
727                    if (LOG.isDebugEnabled()) {
728                        LOG.info(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", browsers explicitly suppressed");
729                    }
730                    return;
731                }
732    
733                if (path != null && path.length >= networkTTL) {
734                    if (LOG.isDebugEnabled()) {
735                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL
736                            + " network hops only : " + info);
737                    }
738                    return;
739                }
740    
741                if (contains(path, localBrokerPath[0])) {
742                    // Ignore this consumer as it's a consumer we locally sent to the broker.
743                    if (LOG.isDebugEnabled()) {
744                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info);
745                    }
746                    return;
747                }
748    
749                if (!isPermissableDestination(info.getDestination())) {
750                    // ignore if not in the permitted or in the excluded list
751                    if (LOG.isDebugEnabled()) {
752                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination()
753                            + " is not permiited :" + info);
754                    }
755                    return;
756                }
757    
758                // in a cyclic network there can be multiple bridges per broker that can propagate
759                // a network subscription so there is a need to synchronize on a shared entity
760                synchronized (brokerService.getVmConnectorURI()) {
761                    if (addConsumerInfo(info)) {
762                        if (LOG.isDebugEnabled()) {
763                            LOG.debug(configuration.getBrokerName() + " bridged sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
764                        }
765                    } else {
766                        if (LOG.isDebugEnabled()) {
767                            LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName
768                                + " as already subscribed to matching destination : " + info);
769                        }
770                    }
771                }
772            } else if (data.getClass() == DestinationInfo.class) {
773                // It's a destination info - we want to pass up information about temporary destinations
774                final DestinationInfo destInfo = (DestinationInfo) data;
775                BrokerId[] path = destInfo.getBrokerPath();
776                if (path != null && path.length >= networkTTL) {
777                    if (LOG.isDebugEnabled()) {
778                        LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only");
779                    }
780                    return;
781                }
782                if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
783                    // Ignore this consumer as it's a consumer we locally sent to the broker.
784                    if (LOG.isDebugEnabled()) {
785                        LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
786                    }
787                    return;
788                }
789                destInfo.setConnectionId(localConnectionInfo.getConnectionId());
790                if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
791                    // re-set connection id so comes from here
792                    ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
793                    tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
794                }
795                destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
796                if (LOG.isTraceEnabled()) {
797                    LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker
798                        + " from " + remoteBrokerName + ", destination: " + destInfo);
799                }
800                if (destInfo.isRemoveOperation()) {
801                    // Serialize with removeSub operations such that all removeSub advisories
802                    // are generated
803                    serialExecutor.execute(new Runnable() {
804                        @Override
805                        public void run() {
806                            try {
807                                localBroker.oneway(destInfo);
808                            } catch (IOException e) {
809                                LOG.warn("failed to deliver remove command for destination:" + destInfo.getDestination(), e);
810                            }
811                        }
812                    });
813                } else {
814                    localBroker.oneway(destInfo);
815                }
816            } else if (data.getClass() == RemoveInfo.class) {
817                ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
818                removeDemandSubscription(id);
819            }
820        }
821    
822        @Override
823        public void serviceLocalException(Throwable error) {
824            serviceLocalException(null, error);
825        }
826    
827        public void serviceLocalException(MessageDispatch messageDispatch, Throwable error) {
828    
829            if (!disposed.get()) {
830                if (error instanceof DestinationDoesNotExistException && ((DestinationDoesNotExistException) error).isTemporary()) {
831                    // not a reason to terminate the bridge - temps can disappear with
832                    // pending sends as the demand sub may outlive the remote dest
833                    if (messageDispatch != null) {
834                        LOG.warn("PoisonAck of " + messageDispatch.getMessage().getMessageId() + " on forwarding error: " + error);
835                        try {
836                            MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
837                            poisonAck.setPoisonCause(error);
838                            localBroker.oneway(poisonAck);
839                        } catch (IOException ioe) {
840                            LOG.error("Failed to posion ack message following forward failure: " + ioe, ioe);
841                        }
842                        fireFailedForwardAdvisory(messageDispatch, error);
843                    } else {
844                        LOG.warn("Ignoring exception on forwarding to non existent temp dest: " + error, error);
845                    }
846                    return;
847                }
848    
849                if (LOG.isInfoEnabled()) {
850                    LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
851                }
852                if (LOG.isDebugEnabled()) {
853                    LOG.debug("The local Exception was:" + error, error);
854                }
855    
856                brokerService.getTaskRunnerFactory().execute(new Runnable() {
857                    @Override
858                    public void run() {
859                        ServiceSupport.dispose(getControllingService());
860                    }
861                });
862                fireBridgeFailed();
863            }
864        }
865    
866        private void fireFailedForwardAdvisory(MessageDispatch messageDispatch, Throwable error) {
867            if (configuration.isAdvisoryForFailedForward()) {
868                AdvisoryBroker advisoryBroker = null;
869                try {
870                    advisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
871    
872                    if (advisoryBroker != null) {
873                        ConnectionContext context = new ConnectionContext();
874                        context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
875                        context.setBroker(brokerService.getBroker());
876    
877                        ActiveMQMessage advisoryMessage = new ActiveMQMessage();
878                        advisoryMessage.setStringProperty("cause", error.getLocalizedMessage());
879                        advisoryBroker.fireAdvisory(context, AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic(), messageDispatch.getMessage(), null,
880                            advisoryMessage);
881    
882                    }
883                } catch (Exception e) {
884                    LOG.warn("failed to fire forward failure advisory, cause: " + e);
885                    if (LOG.isDebugEnabled()) {
886                        LOG.debug("detail", e);
887                    }
888                }
889            }
890        }
891    
892        protected Service getControllingService() {
893            return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
894        }
895    
896        protected void addSubscription(DemandSubscription sub) throws IOException {
897            if (sub != null) {
898                localBroker.oneway(sub.getLocalInfo());
899            }
900        }
901    
902        protected void removeSubscription(final DemandSubscription sub) throws IOException {
903            if (sub != null) {
904                if (LOG.isTraceEnabled()) {
905                    LOG.trace(configuration.getBrokerName() + " remove local subscription:" + sub.getLocalInfo().getConsumerId() + " for remote "
906                        + sub.getRemoteInfo().getConsumerId());
907                }
908    
909                // ensure not available for conduit subs pending removal
910                subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
911                subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
912    
913                // continue removal in separate thread to free up this thread for outstanding responses
914                // Serialize with removeDestination operations so that removeSubs are serialized with
915                // removeDestinations such that all removeSub advisories are generated
916                serialExecutor.execute(new Runnable() {
917                    @Override
918                    public void run() {
919                        sub.waitForCompletion();
920                        try {
921                            localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
922                        } catch (IOException e) {
923                            LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e);
924                        }
925                    }
926                });
927            }
928        }
929    
930        protected Message configureMessage(MessageDispatch md) throws IOException {
931            Message message = md.getMessage().copy();
932            // Update the packet to show where it came from.
933            message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
934            message.setProducerId(producerInfo.getProducerId());
935            message.setDestination(md.getDestination());
936            message.setMemoryUsage(null);
937            if (message.getOriginalTransactionId() == null) {
938                message.setOriginalTransactionId(message.getTransactionId());
939            }
940            message.setTransactionId(null);
941            if (configuration.isUseCompression()) {
942                message.compress();
943            }
944            return message;
945        }
946    
947        protected void serviceLocalCommand(Command command) {
948            if (!disposed.get()) {
949                try {
950                    if (command.isMessageDispatch()) {
951                        safeWaitUntilStarted();
952                        enqueueCounter.incrementAndGet();
953                        final MessageDispatch md = (MessageDispatch) command;
954                        final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
955                        if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
956    
957                            if (suppressMessageDispatch(md, sub)) {
958                                if (LOG.isDebugEnabled()) {
959                                    LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName
960                                        + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath())
961                                        + ", message: " + md.getMessage());
962                                }
963                                // still ack as it may be durable
964                                try {
965                                    localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
966                                } finally {
967                                    sub.decrementOutstandingResponses();
968                                }
969                                return;
970                            }
971    
972                            Message message = configureMessage(md);
973                            if (LOG.isDebugEnabled()) {
974                                LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") "
975                                    + (LOG.isTraceEnabled() ? message : message.getMessageId()) + ", consumer: " + md.getConsumerId() + ", destination "
976                                    + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
977                            }
978    
979                            if (isDuplex() && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
980                                try {
981                                    // never request b/c they are eventually acked async
982                                    remoteBroker.oneway(message);
983                                } finally {
984                                    sub.decrementOutstandingResponses();
985                                }
986                                return;
987                            }
988    
989                            if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
990    
991                                // The message was not sent using async send, so we should only
992                                // ack the local broker when we get confirmation that the remote
993                                // broker has received the message.
994                                remoteBroker.asyncRequest(message, new ResponseCallback() {
995                                    @Override
996                                    public void onCompletion(FutureResponse future) {
997                                        try {
998                                            Response response = future.getResult();
999                                            if (response.isException()) {
1000                                                ExceptionResponse er = (ExceptionResponse) response;
1001                                                serviceLocalException(md, er.getException());
1002                                            } else {
1003                                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1004                                                dequeueCounter.incrementAndGet();
1005                                            }
1006                                        } catch (IOException e) {
1007                                            serviceLocalException(md, e);
1008                                        } finally {
1009                                            sub.decrementOutstandingResponses();
1010                                        }
1011                                    }
1012                                });
1013    
1014                            } else {
1015                                // If the message was originally sent using async send, we will
1016                                // preserve that QOS by bridging it using an async send (small chance
1017                                // of message loss).
1018                                try {
1019                                    remoteBroker.oneway(message);
1020                                    localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
1021                                    dequeueCounter.incrementAndGet();
1022                                } finally {
1023                                    sub.decrementOutstandingResponses();
1024                                }
1025                            }
1026                        } else {
1027                            if (LOG.isDebugEnabled()) {
1028                                LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: "
1029                                    + md.getMessage());
1030                            }
1031                        }
1032                    } else if (command.isBrokerInfo()) {
1033                        futureLocalBrokerInfo.set((BrokerInfo) command);
1034                    } else if (command.isShutdownInfo()) {
1035                        LOG.info(configuration.getBrokerName() + " Shutting down");
1036                        stop();
1037                    } else if (command.getClass() == ConnectionError.class) {
1038                        ConnectionError ce = (ConnectionError) command;
1039                        serviceLocalException(ce.getException());
1040                    } else {
1041                        switch (command.getDataStructureType()) {
1042                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
1043                                break;
1044                            default:
1045                                LOG.warn("Unexpected local command: " + command);
1046                        }
1047                    }
1048                } catch (Throwable e) {
1049                    LOG.warn("Caught an exception processing local command", e);
1050                    serviceLocalException(e);
1051                }
1052            }
1053        }
1054    
1055        private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
1056            boolean suppress = false;
1057            // for durable subs, suppression via filter leaves dangling acks so we
1058            // need to check here and allow the ack irrespective
1059            if (sub.getLocalInfo().isDurable()) {
1060                MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
1061                messageEvalContext.setMessageReference(md.getMessage());
1062                messageEvalContext.setDestination(md.getDestination());
1063                suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
1064            }
1065            return suppress;
1066        }
1067    
1068        public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
1069            if (brokerPath != null) {
1070                for (BrokerId id : brokerPath) {
1071                    if (brokerId.equals(id)) {
1072                        return true;
1073                    }
1074                }
1075            }
1076            return false;
1077        }
1078    
1079        protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
1080            if (brokerPath == null || brokerPath.length == 0) {
1081                return pathsToAppend;
1082            }
1083            BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
1084            System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1085            System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
1086            return rc;
1087        }
1088    
1089        protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
1090            if (brokerPath == null || brokerPath.length == 0) {
1091                return new BrokerId[] { idToAppend };
1092            }
1093            BrokerId rc[] = new BrokerId[brokerPath.length + 1];
1094            System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
1095            rc[brokerPath.length] = idToAppend;
1096            return rc;
1097        }
1098    
1099        protected boolean isPermissableDestination(ActiveMQDestination destination) {
1100            return isPermissableDestination(destination, false);
1101        }
1102    
1103        protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
1104            // Are we not bridging temporary destinations?
1105            if (destination.isTemporary()) {
1106                if (allowTemporary) {
1107                    return true;
1108                } else {
1109                    return configuration.isBridgeTempDestinations();
1110                }
1111            }
1112    
1113            ActiveMQDestination[] dests = staticallyIncludedDestinations;
1114            if (dests != null && dests.length > 0) {
1115                for (ActiveMQDestination dest : dests) {
1116                    DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1117                    if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1118                        return true;
1119                    }
1120                }
1121            }
1122    
1123            dests = excludedDestinations;
1124            if (dests != null && dests.length > 0) {
1125                for (ActiveMQDestination dest : dests) {
1126                    DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
1127                    if (dest != null && exclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1128                        return false;
1129                    }
1130                }
1131            }
1132    
1133            dests = dynamicallyIncludedDestinations;
1134            if (dests != null && dests.length > 0) {
1135                for (ActiveMQDestination dest : dests) {
1136                    DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
1137                    if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
1138                        return true;
1139                    }
1140                }
1141    
1142                return false;
1143            }
1144            return true;
1145        }
1146    
1147        /**
1148         * Subscriptions for these destinations are always created
1149         */
1150        protected void setupStaticDestinations() {
1151            ActiveMQDestination[] dests = staticallyIncludedDestinations;
1152            if (dests != null) {
1153                for (ActiveMQDestination dest : dests) {
1154                    DemandSubscription sub = createDemandSubscription(dest);
1155                    try {
1156                        addSubscription(sub);
1157                    } catch (IOException e) {
1158                        LOG.error("Failed to add static destination " + dest, e);
1159                    }
1160                    if (LOG.isTraceEnabled()) {
1161                        LOG.trace(configuration.getBrokerName() + ", bridging messages for static destination: " + dest);
1162                    }
1163                }
1164            }
1165        }
1166    
1167        protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
1168            boolean consumerAdded = false;
1169            ConsumerInfo info = consumerInfo.copy();
1170            addRemoteBrokerToBrokerPath(info);
1171            DemandSubscription sub = createDemandSubscription(info);
1172            if (sub != null) {
1173                if (duplicateSuppressionIsRequired(sub)) {
1174                    undoMapRegistration(sub);
1175                } else {
1176                    addSubscription(sub);
1177                    consumerAdded = true;
1178                }
1179            }
1180            return consumerAdded;
1181        }
1182    
1183        private void undoMapRegistration(DemandSubscription sub) {
1184            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1185            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1186        }
1187    
1188        /*
1189         * check our existing subs networkConsumerIds against the list of network
1190         * ids in this subscription A match means a duplicate which we suppress for
1191         * topics and maybe for queues
1192         */
1193        private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1194            final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1195            boolean suppress = false;
1196    
1197            if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() || consumerInfo.getDestination().isTopic()
1198                && !configuration.isSuppressDuplicateTopicSubscriptions()) {
1199                return suppress;
1200            }
1201    
1202            List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1203            Collection<Subscription> currentSubs = getRegionSubscriptions(consumerInfo.getDestination());
1204            for (Subscription sub : currentSubs) {
1205                List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1206                if (!networkConsumers.isEmpty()) {
1207                    if (matchFound(candidateConsumers, networkConsumers)) {
1208                        if (isInActiveDurableSub(sub)) {
1209                            suppress = false;
1210                        } else {
1211                            suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1212                        }
1213                        break;
1214                    }
1215                }
1216            }
1217            return suppress;
1218        }
1219    
1220        private boolean isInActiveDurableSub(Subscription sub) {
1221            return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription) sub).isActive());
1222        }
1223    
1224        private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1225            boolean suppress = false;
1226    
1227            if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1228                if (LOG.isDebugEnabled()) {
1229                    LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName + ", sub: " + candidateInfo
1230                        + " is duplicated by network subscription with equal or higher network priority: " + existingSub + ", networkConsumerIds: "
1231                        + existingSub.getConsumerInfo().getNetworkConsumerIds());
1232                }
1233                suppress = true;
1234            } else {
1235                // remove the existing lower priority duplicate and allow this candidate
1236                try {
1237                    removeDuplicateSubscription(existingSub);
1238    
1239                    if (LOG.isDebugEnabled()) {
1240                        LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo() + " with sub from "
1241                            + remoteBrokerName + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
1242                            + candidateInfo.getNetworkConsumerIds());
1243                    }
1244                } catch (IOException e) {
1245                    LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e);
1246                }
1247            }
1248            return suppress;
1249        }
1250    
1251        private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1252            for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1253                if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1254                    break;
1255                }
1256            }
1257        }
1258    
1259        private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1260            boolean found = false;
1261            for (ConsumerId aliasConsumer : networkConsumers) {
1262                if (candidateConsumers.contains(aliasConsumer)) {
1263                    found = true;
1264                    break;
1265                }
1266            }
1267            return found;
1268        }
1269    
1270        private final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1271            RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1272            Region region;
1273            Collection<Subscription> subs;
1274    
1275            region = null;
1276            switch (dest.getDestinationType()) {
1277                case ActiveMQDestination.QUEUE_TYPE:
1278                    region = region_broker.getQueueRegion();
1279                    break;
1280                case ActiveMQDestination.TOPIC_TYPE:
1281                    region = region_broker.getTopicRegion();
1282                    break;
1283                case ActiveMQDestination.TEMP_QUEUE_TYPE:
1284                    region = region_broker.getTempQueueRegion();
1285                    break;
1286                case ActiveMQDestination.TEMP_TOPIC_TYPE:
1287                    region = region_broker.getTempTopicRegion();
1288                    break;
1289            }
1290    
1291            if (region instanceof AbstractRegion) {
1292                subs = ((AbstractRegion) region).getSubscriptions().values();
1293            } else {
1294                subs = null;
1295            }
1296    
1297            return subs;
1298        }
1299    
1300        protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1301            // add our original id to ourselves
1302            info.addNetworkConsumerId(info.getConsumerId());
1303            return doCreateDemandSubscription(info);
1304        }
1305    
1306        protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1307            DemandSubscription result = new DemandSubscription(info);
1308            result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1309            if (info.getDestination().isTemporary()) {
1310                // reset the local connection Id
1311                ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1312                dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1313            }
1314    
1315            if (configuration.isDecreaseNetworkConsumerPriority()) {
1316                byte priority = (byte) configuration.getConsumerPriorityBase();
1317                if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1318                    // The longer the path to the consumer, the less it's consumer priority.
1319                    priority -= info.getBrokerPath().length + 1;
1320                }
1321                result.getLocalInfo().setPriority(priority);
1322                if (LOG.isDebugEnabled()) {
1323                    LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
1324                }
1325            }
1326            configureDemandSubscription(info, result);
1327            return result;
1328        }
1329    
1330        final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1331            ConsumerInfo info = new ConsumerInfo();
1332            info.setNetworkSubscription(true);
1333            info.setDestination(destination);
1334    
1335            // Indicate that this subscription is being made on behalf of the remote broker.
1336            info.setBrokerPath(new BrokerId[] { remoteBrokerId });
1337    
1338            // the remote info held by the DemandSubscription holds the original
1339            // consumerId, the local info get's overwritten
1340            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1341            DemandSubscription result = null;
1342            try {
1343                result = createDemandSubscription(info);
1344            } catch (IOException e) {
1345                LOG.error("Failed to create DemandSubscription ", e);
1346            }
1347            return result;
1348        }
1349    
1350        protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1351            if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
1352                sub.getLocalInfo().setDispatchAsync(true);
1353            } else {
1354                sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1355            }
1356            sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1357            subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1358            subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1359    
1360            sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1361            if (!info.isDurable()) {
1362                // This works for now since we use a VM connection to the local broker.
1363                // may need to change if we ever subscribe to a remote broker.
1364                sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1365            } else {
1366                // need to ack this message if it is ignored as it is durable so
1367                // we check before we send. see: suppressMessageDispatch()
1368            }
1369        }
1370    
1371        protected void removeDemandSubscription(ConsumerId id) throws IOException {
1372            DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1373            if (LOG.isDebugEnabled()) {
1374                LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id
1375                    + ", matching sub: " + sub);
1376            }
1377            if (sub != null) {
1378                removeSubscription(sub);
1379                if (LOG.isDebugEnabled()) {
1380                    LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " :  " + sub.getRemoteInfo());
1381                }
1382            }
1383        }
1384    
1385        protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1386            boolean removeDone = false;
1387            DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1388            if (sub != null) {
1389                try {
1390                    removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1391                    removeDone = true;
1392                } catch (IOException e) {
1393                    LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e);
1394                }
1395            }
1396            return removeDone;
1397        }
1398    
1399        /**
1400         * Performs a timed wait on the started latch and then checks for disposed
1401         * before performing another wait each time the the started wait times out.
1402         *
1403         * @throws InterruptedException
1404         */
1405        protected void safeWaitUntilStarted() throws InterruptedException {
1406            while (!disposed.get()) {
1407                if (startedLatch.await(1, TimeUnit.SECONDS)) {
1408                    return;
1409                }
1410            }
1411        }
1412    
1413        protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1414            NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1415            if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1416                PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1417                if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1418                    filterFactory = entry.getNetworkBridgeFilterFactory();
1419                }
1420            }
1421            return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL());
1422        }
1423    
1424        protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1425            info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1426        }
1427    
1428        protected BrokerId[] getRemoteBrokerPath() {
1429            return remoteBrokerPath;
1430        }
1431    
1432        @Override
1433        public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1434            this.networkBridgeListener = listener;
1435        }
1436    
1437        private void fireBridgeFailed() {
1438            NetworkBridgeListener l = this.networkBridgeListener;
1439            if (l != null && this.bridgeFailed.compareAndSet(false, true)) {
1440                l.bridgeFailed();
1441            }
1442        }
1443    
1444        /**
1445         * @return Returns the dynamicallyIncludedDestinations.
1446         */
1447        public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
1448            return dynamicallyIncludedDestinations;
1449        }
1450    
1451        /**
1452         * @param dynamicallyIncludedDestinations
1453         *            The dynamicallyIncludedDestinations to set.
1454         */
1455        public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
1456            this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
1457        }
1458    
1459        /**
1460         * @return Returns the excludedDestinations.
1461         */
1462        public ActiveMQDestination[] getExcludedDestinations() {
1463            return excludedDestinations;
1464        }
1465    
1466        /**
1467         * @param excludedDestinations
1468         *            The excludedDestinations to set.
1469         */
1470        public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
1471            this.excludedDestinations = excludedDestinations;
1472        }
1473    
1474        /**
1475         * @return Returns the staticallyIncludedDestinations.
1476         */
1477        public ActiveMQDestination[] getStaticallyIncludedDestinations() {
1478            return staticallyIncludedDestinations;
1479        }
1480    
1481        /**
1482         * @param staticallyIncludedDestinations
1483         *            The staticallyIncludedDestinations to set.
1484         */
1485        public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
1486            this.staticallyIncludedDestinations = staticallyIncludedDestinations;
1487        }
1488    
1489        /**
1490         * @return Returns the durableDestinations.
1491         */
1492        public ActiveMQDestination[] getDurableDestinations() {
1493            return durableDestinations;
1494        }
1495    
1496        /**
1497         * @param durableDestinations
1498         *            The durableDestinations to set.
1499         */
1500        public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
1501            this.durableDestinations = durableDestinations;
1502        }
1503    
1504        /**
1505         * @return Returns the localBroker.
1506         */
1507        public Transport getLocalBroker() {
1508            return localBroker;
1509        }
1510    
1511        /**
1512         * @return Returns the remoteBroker.
1513         */
1514        public Transport getRemoteBroker() {
1515            return remoteBroker;
1516        }
1517    
1518        /**
1519         * @return the createdByDuplex
1520         */
1521        public boolean isCreatedByDuplex() {
1522            return this.createdByDuplex;
1523        }
1524    
1525        /**
1526         * @param createdByDuplex
1527         *            the createdByDuplex to set
1528         */
1529        public void setCreatedByDuplex(boolean createdByDuplex) {
1530            this.createdByDuplex = createdByDuplex;
1531        }
1532    
1533        @Override
1534        public String getRemoteAddress() {
1535            return remoteBroker.getRemoteAddress();
1536        }
1537    
1538        @Override
1539        public String getLocalAddress() {
1540            return localBroker.getRemoteAddress();
1541        }
1542    
1543        @Override
1544        public String getRemoteBrokerName() {
1545            return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1546        }
1547    
1548        @Override
1549        public String getLocalBrokerName() {
1550            return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1551        }
1552    
1553        @Override
1554        public long getDequeueCounter() {
1555            return dequeueCounter.get();
1556        }
1557    
1558        @Override
1559        public long getEnqueueCounter() {
1560            return enqueueCounter.get();
1561        }
1562    
1563        protected boolean isDuplex() {
1564            return configuration.isDuplex() || createdByDuplex;
1565        }
1566    
1567        public ConcurrentHashMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1568            return subscriptionMapByRemoteId;
1569        }
1570    
1571        @Override
1572        public void setBrokerService(BrokerService brokerService) {
1573            this.brokerService = brokerService;
1574            this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1575            localBrokerPath[0] = localBrokerId;
1576        }
1577    
1578        @Override
1579        public void setMbeanObjectName(ObjectName objectName) {
1580            this.mbeanObjectName = objectName;
1581        }
1582    
1583        @Override
1584        public ObjectName getMbeanObjectName() {
1585            return mbeanObjectName;
1586        }
1587    
1588        /*
1589         * Used to allow for async tasks to await receipt of the BrokerInfo from the local and
1590         * remote sides of the network bridge.
1591         */
1592        private static class FutureBrokerInfo implements Future<BrokerInfo> {
1593    
1594            private final CountDownLatch slot = new CountDownLatch(1);
1595            private final AtomicBoolean disposed;
1596            private BrokerInfo info = null;
1597    
1598            public FutureBrokerInfo(BrokerInfo info, AtomicBoolean disposed) {
1599                this.info = info;
1600                this.disposed = disposed;
1601            }
1602    
1603            @Override
1604            public boolean cancel(boolean mayInterruptIfRunning) {
1605                slot.countDown();
1606                return true;
1607            }
1608    
1609            @Override
1610            public boolean isCancelled() {
1611                return slot.getCount() == 0 && info == null;
1612            }
1613    
1614            @Override
1615            public boolean isDone() {
1616                return info != null;
1617            }
1618    
1619            @Override
1620            public BrokerInfo get() throws InterruptedException, ExecutionException {
1621                try {
1622                    if (info == null) {
1623                        while (!disposed.get()) {
1624                            if (slot.await(1, TimeUnit.SECONDS)) {
1625                                break;
1626                            }
1627                        }
1628                    }
1629                    return info;
1630                } catch (InterruptedException e) {
1631                    Thread.currentThread().interrupt();
1632                    if (LOG.isDebugEnabled()) {
1633                        LOG.debug("Operation interupted: " + e, e);
1634                    }
1635                    throw new InterruptedException("Interrupted.");
1636                }
1637            }
1638    
1639            @Override
1640            public BrokerInfo get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1641                try {
1642                    if (info == null) {
1643                        long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
1644    
1645                        while (!disposed.get() || System.currentTimeMillis() < deadline) {
1646                            if (slot.await(1, TimeUnit.MILLISECONDS)) {
1647                                break;
1648                            }
1649                        }
1650                        if (info == null) {
1651                            throw new TimeoutException();
1652                        }
1653                    }
1654                    return info;
1655                } catch (InterruptedException e) {
1656                    throw new InterruptedException("Interrupted.");
1657                }
1658            }
1659    
1660            public void set(BrokerInfo info) {
1661                this.info = info;
1662                this.slot.countDown();
1663            }
1664        }
1665    
1666    }