001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.io.IOException;
020    import java.security.GeneralSecurityException;
021    import java.security.cert.X509Certificate;
022    import java.util.Arrays;
023    import java.util.Collection;
024    import java.util.List;
025    import java.util.Properties;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.CountDownLatch;
028    import java.util.concurrent.TimeUnit;
029    import java.util.concurrent.atomic.AtomicBoolean;
030    import java.util.concurrent.atomic.AtomicLong;
031    
032    import javax.management.ObjectName;
033    
034    import org.apache.activemq.Service;
035    import org.apache.activemq.advisory.AdvisorySupport;
036    import org.apache.activemq.broker.BrokerService;
037    import org.apache.activemq.broker.BrokerServiceAware;
038    import org.apache.activemq.broker.TransportConnection;
039    import org.apache.activemq.broker.region.AbstractRegion;
040    import org.apache.activemq.broker.region.DurableTopicSubscription;
041    import org.apache.activemq.broker.region.Region;
042    import org.apache.activemq.broker.region.RegionBroker;
043    import org.apache.activemq.broker.region.Subscription;
044    import org.apache.activemq.broker.region.policy.PolicyEntry;
045    import org.apache.activemq.command.ActiveMQDestination;
046    import org.apache.activemq.command.ActiveMQMessage;
047    import org.apache.activemq.command.ActiveMQTempDestination;
048    import org.apache.activemq.command.ActiveMQTopic;
049    import org.apache.activemq.command.BrokerId;
050    import org.apache.activemq.command.BrokerInfo;
051    import org.apache.activemq.command.Command;
052    import org.apache.activemq.command.ConnectionError;
053    import org.apache.activemq.command.ConnectionId;
054    import org.apache.activemq.command.ConnectionInfo;
055    import org.apache.activemq.command.ConsumerId;
056    import org.apache.activemq.command.ConsumerInfo;
057    import org.apache.activemq.command.DataStructure;
058    import org.apache.activemq.command.DestinationInfo;
059    import org.apache.activemq.command.ExceptionResponse;
060    import org.apache.activemq.command.KeepAliveInfo;
061    import org.apache.activemq.command.Message;
062    import org.apache.activemq.command.MessageAck;
063    import org.apache.activemq.command.MessageDispatch;
064    import org.apache.activemq.command.NetworkBridgeFilter;
065    import org.apache.activemq.command.ProducerInfo;
066    import org.apache.activemq.command.RemoveInfo;
067    import org.apache.activemq.command.Response;
068    import org.apache.activemq.command.SessionInfo;
069    import org.apache.activemq.command.ShutdownInfo;
070    import org.apache.activemq.command.WireFormatInfo;
071    import org.apache.activemq.filter.DestinationFilter;
072    import org.apache.activemq.filter.MessageEvaluationContext;
073    import org.apache.activemq.transport.DefaultTransportListener;
074    import org.apache.activemq.transport.FutureResponse;
075    import org.apache.activemq.transport.ResponseCallback;
076    import org.apache.activemq.transport.Transport;
077    import org.apache.activemq.transport.TransportDisposedIOException;
078    import org.apache.activemq.transport.TransportFilter;
079    import org.apache.activemq.transport.tcp.SslTransport;
080    import org.apache.activemq.util.IdGenerator;
081    import org.apache.activemq.util.IntrospectionSupport;
082    import org.apache.activemq.util.LongSequenceGenerator;
083    import org.apache.activemq.util.MarshallingSupport;
084    import org.apache.activemq.util.ServiceStopper;
085    import org.apache.activemq.util.ServiceSupport;
086    import org.slf4j.Logger;
087    import org.slf4j.LoggerFactory;
088    
089    /**
090     * A useful base class for implementing demand forwarding bridges.
091     */
092    public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
093        private static final Logger LOG = LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
094        protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
095        protected final Transport localBroker;
096        protected final Transport remoteBroker;
097        protected final IdGenerator idGenerator = new IdGenerator();
098        protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
099        protected ConnectionInfo localConnectionInfo;
100        protected ConnectionInfo remoteConnectionInfo;
101        protected SessionInfo localSessionInfo;
102        protected ProducerInfo producerInfo;
103        protected String remoteBrokerName = "Unknown";
104        protected String localClientId;
105        protected ConsumerInfo demandConsumerInfo;
106        protected int demandConsumerDispatched;
107        protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
108        protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
109        protected AtomicBoolean disposed = new AtomicBoolean();
110        protected BrokerId localBrokerId;
111        protected ActiveMQDestination[] excludedDestinations;
112        protected ActiveMQDestination[] dynamicallyIncludedDestinations;
113        protected ActiveMQDestination[] staticallyIncludedDestinations;
114        protected ActiveMQDestination[] durableDestinations;
115        protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
116        protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
117        protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
118        protected final CountDownLatch startedLatch = new CountDownLatch(2);
119        protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
120        protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
121        protected NetworkBridgeConfiguration configuration;
122        protected final NetworkBridgeFilterFactory defaultFilterFactory = new DefaultNetworkBridgeFilterFactory();
123    
124        protected final BrokerId remoteBrokerPath[] = new BrokerId[] {null};
125        protected Object brokerInfoMutex = new Object();
126        protected BrokerId remoteBrokerId;
127    
128        final AtomicLong enqueueCounter = new AtomicLong();
129        final AtomicLong dequeueCounter = new AtomicLong();
130    
131        private NetworkBridgeListener networkBridgeListener;
132        private boolean createdByDuplex;
133        private BrokerInfo localBrokerInfo;
134        private BrokerInfo remoteBrokerInfo;
135    
136        private final AtomicBoolean started = new AtomicBoolean();
137        private TransportConnection duplexInitiatingConnection;
138        private BrokerService brokerService = null;
139        private ObjectName mbeanObjectName;
140    
141        public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
142            this.configuration = configuration;
143            this.localBroker = localBroker;
144            this.remoteBroker = remoteBroker;
145        }
146    
147        public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
148            this.localBrokerInfo = localBrokerInfo;
149            this.remoteBrokerInfo = remoteBrokerInfo;
150            this.duplexInitiatingConnection = connection;
151            start();
152            serviceRemoteCommand(remoteBrokerInfo);
153        }
154    
155        public void start() throws Exception {
156            if (started.compareAndSet(false, true)) {
157    
158                if (brokerService == null) {
159                    throw new IllegalArgumentException("BrokerService is null on " + this);
160                }
161    
162                localBroker.setTransportListener(new DefaultTransportListener() {
163    
164                    @Override
165                    public void onCommand(Object o) {
166                        Command command = (Command) o;
167                        serviceLocalCommand(command);
168                    }
169    
170                    @Override
171                    public void onException(IOException error) {
172                        serviceLocalException(error);
173                    }
174                });
175                remoteBroker.setTransportListener(new DefaultTransportListener() {
176    
177                    public void onCommand(Object o) {
178                        Command command = (Command) o;
179                        serviceRemoteCommand(command);
180                    }
181    
182                    public void onException(IOException error) {
183                        serviceRemoteException(error);
184                    }
185    
186                });
187    
188                localBroker.start();
189                remoteBroker.start();
190                if (!disposed.get()) {
191                    try {
192                        triggerRemoteStartBridge();
193                    } catch (IOException e) {
194                        LOG.warn("Caught exception from remote start", e);
195                    }
196                } else {
197                    LOG.warn ("Bridge was disposed before the start() method was fully executed.");
198                    throw new TransportDisposedIOException();
199                }
200            }
201        }
202    
203        protected void triggerLocalStartBridge() throws IOException {
204            brokerService.getTaskRunnerFactory().execute(new Runnable() {
205                public void run() {
206                    final String originalName = Thread.currentThread().getName();
207                    Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
208                    try {
209                        startLocalBridge();
210                    } catch (Throwable e) {
211                        serviceLocalException(e);
212                    } finally {
213                        Thread.currentThread().setName(originalName);
214                    }
215                }
216            });
217        }
218    
219        protected void triggerRemoteStartBridge() throws IOException {
220            brokerService.getTaskRunnerFactory().execute(new Runnable() {
221                public void run() {
222                    final String originalName = Thread.currentThread().getName();
223                    Thread.currentThread().setName("StartRemoteBridge: remoteBroker=" + remoteBroker);
224                    try {
225                        startRemoteBridge();
226                    } catch (Exception e) {
227                        serviceRemoteException(e);
228                    } finally {
229                        Thread.currentThread().setName(originalName);
230                    }
231                }
232            });
233        }
234    
235        private void startLocalBridge() throws Throwable {
236            if (localBridgeStarted.compareAndSet(false, true)) {
237                synchronized (this) {
238                    if (LOG.isTraceEnabled()) {
239                        LOG.trace(configuration.getBrokerName() + " starting local Bridge, localBroker=" + localBroker);
240                    }
241                    if (!disposed.get()) {
242                        localConnectionInfo = new ConnectionInfo();
243                        localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
244                        localClientId = configuration.getName() + "_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
245                        localConnectionInfo.setClientId(localClientId);
246                        localConnectionInfo.setUserName(configuration.getUserName());
247                        localConnectionInfo.setPassword(configuration.getPassword());
248                        Transport originalTransport = remoteBroker;
249                        while (originalTransport instanceof TransportFilter) {
250                            originalTransport = ((TransportFilter) originalTransport).getNext();
251                        }
252                        if (originalTransport instanceof SslTransport) {
253                            X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
254                            localConnectionInfo.setTransportContext(peerCerts);
255                        }
256                        // sync requests that may fail
257                        Object resp = localBroker.request(localConnectionInfo);
258                        if (resp instanceof ExceptionResponse) {
259                            throw ((ExceptionResponse)resp).getException();
260                        }
261                        localSessionInfo = new SessionInfo(localConnectionInfo, 1);
262                        localBroker.oneway(localSessionInfo);
263    
264                        brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex, remoteBroker.toString());
265                        NetworkBridgeListener l = this.networkBridgeListener;
266                        if (l != null) {
267                            l.onStart(this);
268                        }
269                        LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
270    
271                    } else {
272                        LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed.");
273                    }
274                    startedLatch.countDown();
275                    localStartedLatch.countDown();
276                }
277    
278                safeWaitUntilStarted();
279    
280                if (!disposed.get()) {
281                    setupStaticDestinations();
282                } else {
283                    LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment.");
284                }
285            }
286        }
287    
288        protected void startRemoteBridge() throws Exception {
289            if (remoteBridgeStarted.compareAndSet(false, true)) {
290                if (LOG.isTraceEnabled()) {
291                    LOG.trace(configuration.getBrokerName() + " starting remote Bridge, remoteBroker=" + remoteBroker);
292                }
293                synchronized (this) {
294                    if (!isCreatedByDuplex()) {
295                        BrokerInfo brokerInfo = new BrokerInfo();
296                        brokerInfo.setBrokerName(configuration.getBrokerName());
297                        brokerInfo.setBrokerURL(configuration.getBrokerURL());
298                        brokerInfo.setNetworkConnection(true);
299                        brokerInfo.setDuplexConnection(configuration.isDuplex());
300                        // set our properties
301                        Properties props = new Properties();
302                        IntrospectionSupport.getProperties(configuration, props, null);
303                        String str = MarshallingSupport.propertiesToString(props);
304                        brokerInfo.setNetworkProperties(str);
305                        brokerInfo.setBrokerId(this.localBrokerId);
306                        remoteBroker.oneway(brokerInfo);
307                    }
308                    if (remoteConnectionInfo != null) {
309                        remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
310                    }
311                    remoteConnectionInfo = new ConnectionInfo();
312                    remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
313                    remoteConnectionInfo.setClientId(configuration.getName() + "_" + configuration.getBrokerName() + "_outbound");
314                    remoteConnectionInfo.setUserName(configuration.getUserName());
315                    remoteConnectionInfo.setPassword(configuration.getPassword());
316                    remoteBroker.oneway(remoteConnectionInfo);
317    
318                    SessionInfo remoteSessionInfo = new SessionInfo(remoteConnectionInfo, 1);
319                    remoteBroker.oneway(remoteSessionInfo);
320                    producerInfo = new ProducerInfo(remoteSessionInfo, 1);
321                    producerInfo.setResponseRequired(false);
322                    remoteBroker.oneway(producerInfo);
323                    // Listen to consumer advisory messages on the remote broker to
324                    // determine demand.
325                    if (!configuration.isStaticBridge()) {
326                        demandConsumerInfo = new ConsumerInfo(remoteSessionInfo, 1);
327                        // always dispatch advisory message asynchronously so that we never block the producer
328                        // broker if we are slow
329                        demandConsumerInfo.setDispatchAsync(true);
330                        String advisoryTopic = configuration.getDestinationFilter();
331                        if (configuration.isBridgeTempDestinations()) {
332                            advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
333                        }
334                        demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
335                        demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
336                        remoteBroker.oneway(demandConsumerInfo);
337                    }
338                    startedLatch.countDown();
339                }
340            }
341        }
342    
343        public void stop() throws Exception {
344            if (started.compareAndSet(true, false)) {
345                if (disposed.compareAndSet(false, true)) {
346                    LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
347                    NetworkBridgeListener l = this.networkBridgeListener;
348                    if (l != null) {
349                        l.onStop(this);
350                    }
351                    try {
352                        remoteBridgeStarted.set(false);
353                        final CountDownLatch sendShutdown = new CountDownLatch(1);
354    
355                        brokerService.getTaskRunnerFactory().execute(new Runnable() {
356                            public void run() {
357                                try {
358                                    localBroker.oneway(new ShutdownInfo());
359                                    sendShutdown.countDown();
360                                    remoteBroker.oneway(new ShutdownInfo());
361                                } catch (Throwable e) {
362                                    LOG.debug("Caught exception sending shutdown", e);
363                                } finally {
364                                    sendShutdown.countDown();
365                                }
366    
367                            }
368                        }, "ActiveMQ ForwardingBridge StopTask");
369    
370                        if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
371                            LOG.info("Network Could not shutdown in a timely manner");
372                        }
373                    } finally {
374                        ServiceStopper ss = new ServiceStopper();
375                        ss.stop(remoteBroker);
376                        ss.stop(localBroker);
377                        // Release the started Latch since another thread could be
378                        // stuck waiting for it to start up.
379                        startedLatch.countDown();
380                        startedLatch.countDown();
381                        localStartedLatch.countDown();
382    
383                        ss.throwFirstException();
384                    }
385                }
386    
387                if (remoteBrokerInfo != null) {
388                    brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
389                    brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
390                    LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
391                }
392            }
393        }
394    
395        public void serviceRemoteException(Throwable error) {
396            if (!disposed.get()) {
397                if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
398                    LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
399                } else {
400                    LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
401                }
402                LOG.debug("The remote Exception was: " + error, error);
403                brokerService.getTaskRunnerFactory().execute(new Runnable() {
404                    public void run() {
405                        ServiceSupport.dispose(getControllingService());
406                    }
407                });
408                fireBridgeFailed();
409            }
410        }
411    
412        protected void serviceRemoteCommand(Command command) {
413            if (!disposed.get()) {
414                try {
415                    if (command.isMessageDispatch()) {
416                        safeWaitUntilStarted();
417                        MessageDispatch md = (MessageDispatch) command;
418                        serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
419                        ackAdvisory(md.getMessage());
420                    } else if (command.isBrokerInfo()) {
421                        lastConnectSucceeded.set(true);
422                        remoteBrokerInfo = (BrokerInfo) command;
423                        Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
424                        try {
425                            IntrospectionSupport.getProperties(configuration, props, null);
426                            if (configuration.getExcludedDestinations() != null) {
427                                excludedDestinations = configuration.getExcludedDestinations().toArray(
428                                        new ActiveMQDestination[configuration.getExcludedDestinations().size()]);
429                            }
430                            if (configuration.getStaticallyIncludedDestinations() != null) {
431                                staticallyIncludedDestinations = configuration.getStaticallyIncludedDestinations().toArray(
432                                        new ActiveMQDestination[configuration.getStaticallyIncludedDestinations().size()]);
433                            }
434                            if (configuration.getDynamicallyIncludedDestinations() != null) {
435                                dynamicallyIncludedDestinations = configuration.getDynamicallyIncludedDestinations()
436                                        .toArray(
437                                                new ActiveMQDestination[configuration.getDynamicallyIncludedDestinations()
438                                                        .size()]);
439                            }
440                        } catch (Throwable t) {
441                            LOG.error("Error mapping remote destinations", t);
442                        }
443                        serviceRemoteBrokerInfo(command);
444                        // Let the local broker know the remote broker's ID.
445                        localBroker.oneway(command);
446                        // new peer broker (a consumer can work with remote broker also)
447                        brokerService.getBroker().addBroker(null, remoteBrokerInfo);
448                    } else if (command.getClass() == ConnectionError.class) {
449                        ConnectionError ce = (ConnectionError) command;
450                        serviceRemoteException(ce.getException());
451                    } else {
452                        if (isDuplex()) {
453                            if (command.isMessage()) {
454                                ActiveMQMessage message = (ActiveMQMessage) command;
455                                if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
456                                    || AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
457                                    serviceRemoteConsumerAdvisory(message.getDataStructure());
458                                    ackAdvisory(message);
459                                } else {
460                                    if (!isPermissableDestination(message.getDestination(), true)) {
461                                        return;
462                                    }
463                                    if (message.isResponseRequired()) {
464                                        Response reply = new Response();
465                                        reply.setCorrelationId(message.getCommandId());
466                                        localBroker.oneway(message);
467                                        remoteBroker.oneway(reply);
468                                    } else {
469                                        localBroker.oneway(message);
470                                    }
471                                }
472                            } else {
473                                switch (command.getDataStructureType()) {
474                                case ConnectionInfo.DATA_STRUCTURE_TYPE:
475                                case SessionInfo.DATA_STRUCTURE_TYPE:
476                                case ProducerInfo.DATA_STRUCTURE_TYPE:
477                                    localBroker.oneway(command);
478                                    break;
479                                case MessageAck.DATA_STRUCTURE_TYPE:
480                                    MessageAck ack = (MessageAck) command;
481                                    DemandSubscription localSub = subscriptionMapByRemoteId.get(ack.getConsumerId());
482                                    if (localSub != null) {
483                                        ack.setConsumerId(localSub.getLocalInfo().getConsumerId());
484                                        localBroker.oneway(ack);
485                                    } else {
486                                        LOG.warn("Matching local subscription not found for ack: " + ack);
487                                    }
488                                    break;
489                                case ConsumerInfo.DATA_STRUCTURE_TYPE:
490                                    localStartedLatch.await();
491                                    if (started.get()) {
492                                        if (!addConsumerInfo((ConsumerInfo) command)) {
493                                            if (LOG.isDebugEnabled()) {
494                                                LOG.debug("Ignoring ConsumerInfo: " + command);
495                                            }
496                                        } else {
497                                            if (LOG.isTraceEnabled()) {
498                                                LOG.trace("Adding ConsumerInfo: " + command);
499                                            }
500                                        }
501                                    } else {
502                                        // received a subscription whilst stopping
503                                        LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
504                                    }
505                                    break;
506                                case ShutdownInfo.DATA_STRUCTURE_TYPE:
507                                    // initiator is shutting down, controlled case
508                                    // abortive close dealt with by inactivity monitor
509                                    LOG.info("Stopping network bridge on shutdown of remote broker");
510                                    serviceRemoteException(new IOException(command.toString()));
511                                    break;
512                                default:
513                                    if (LOG.isDebugEnabled()) {
514                                        LOG.debug("Ignoring remote command: " + command);
515                                    }
516                                }
517                            }
518                        } else {
519                            switch (command.getDataStructureType()) {
520                            case KeepAliveInfo.DATA_STRUCTURE_TYPE:
521                            case WireFormatInfo.DATA_STRUCTURE_TYPE:
522                            case ShutdownInfo.DATA_STRUCTURE_TYPE:
523                                break;
524                            default:
525                                LOG.warn("Unexpected remote command: " + command);
526                            }
527                        }
528                    }
529                } catch (Throwable e) {
530                    if (LOG.isDebugEnabled()) {
531                        LOG.debug("Exception processing remote command: " + command, e);
532                    }
533                    serviceRemoteException(e);
534                }
535            }
536        }
537    
538        private void ackAdvisory(Message message) throws IOException {
539            demandConsumerDispatched++;
540            if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
541                MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
542                ack.setConsumerId(demandConsumerInfo.getConsumerId());
543                remoteBroker.oneway(ack);
544                demandConsumerDispatched = 0;
545            }
546        }
547    
548        private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
549            final int networkTTL = configuration.getNetworkTTL();
550            if (data.getClass() == ConsumerInfo.class) {
551                // Create a new local subscription
552                ConsumerInfo info = (ConsumerInfo) data;
553                BrokerId[] path = info.getBrokerPath();
554    
555                if (info.isBrowser()) {
556                    if (LOG.isDebugEnabled()) {
557                        LOG.info(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", browsers explicitly suppressed");
558                    }
559                    return;
560                }
561    
562                if (path != null && path.length >= networkTTL) {
563                    if (LOG.isDebugEnabled()) {
564                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info);
565                    }
566                    return;
567                }
568                if (contains(path, localBrokerPath[0])) {
569                    // Ignore this consumer as it's a consumer we locally sent to the broker.
570                    if (LOG.isDebugEnabled()) {
571                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info);
572                    }
573                    return;
574                }
575                if (!isPermissableDestination(info.getDestination())) {
576                    // ignore if not in the permitted or in the excluded list
577                    if (LOG.isDebugEnabled()) {
578                        LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info);
579                    }
580                    return;
581                }
582    
583                // in a cyclic network there can be multiple bridges per broker that can propagate
584                // a network subscription so there is a need to synchronize on a shared entity
585                synchronized (brokerService.getVmConnectorURI()) {
586                    if (addConsumerInfo(info)) {
587                        if (LOG.isDebugEnabled()) {
588                            LOG.debug(configuration.getBrokerName() + " bridged sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
589                        }
590                    } else {
591                        if (LOG.isDebugEnabled()) {
592                            LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info);
593                        }
594                    }
595                }
596            } else if (data.getClass() == DestinationInfo.class) {
597                // It's a destination info - we want to pass up information about temporary destinations
598                DestinationInfo destInfo = (DestinationInfo) data;
599                BrokerId[] path = destInfo.getBrokerPath();
600                if (path != null && path.length >= networkTTL) {
601                    if (LOG.isDebugEnabled()) {
602                        LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only");
603                    }
604                    return;
605                }
606                if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
607                    // Ignore this consumer as it's a consumer we locally sent to the broker.
608                    if (LOG.isDebugEnabled()) {
609                        LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
610                    }
611                    return;
612                }
613                destInfo.setConnectionId(localConnectionInfo.getConnectionId());
614                if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
615                    // re-set connection id so comes from here
616                    ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
617                    tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
618                }
619                destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
620                if (LOG.isTraceEnabled()) {
621                    LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo);
622                }
623                localBroker.oneway(destInfo);
624            } else if (data.getClass() == RemoveInfo.class) {
625                ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
626                removeDemandSubscription(id);
627            }
628        }
629    
630        public void serviceLocalException(Throwable error) {
631            if (!disposed.get()) {
632                LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
633                LOG.debug("The local Exception was:" + error, error);
634                brokerService.getTaskRunnerFactory().execute(new Runnable() {
635                    public void run() {
636                        ServiceSupport.dispose(getControllingService());
637                    }
638                });
639                fireBridgeFailed();
640            }
641        }
642    
643        protected Service getControllingService() {
644            return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
645        }
646    
647        protected void addSubscription(DemandSubscription sub) throws IOException {
648            if (sub != null) {
649                localBroker.oneway(sub.getLocalInfo());
650            }
651        }
652    
653        protected void removeSubscription(final DemandSubscription sub) throws IOException {
654            if (sub != null) {
655                if (LOG.isDebugEnabled()) {
656                    LOG.debug(configuration.getBrokerName() + " remove local subscription for remote " + sub.getRemoteInfo().getConsumerId());
657                }
658                subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
659                subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
660    
661                // continue removal in separate thread to free up this thread for outstanding responses
662                brokerService.getTaskRunnerFactory().execute(new Runnable() {
663                    public void run() {
664                        sub.waitForCompletion();
665                        try {
666                            localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
667                        } catch (IOException e) {
668                            LOG.warn("failed to deliver remove command for local subscription, for remote " + sub.getRemoteInfo().getConsumerId(), e);
669                        }
670                    }
671                });
672            }
673        }
674    
675        protected Message configureMessage(MessageDispatch md) throws IOException {
676            Message message = md.getMessage().copy();
677            // Update the packet to show where it came from.
678            message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
679            message.setProducerId(producerInfo.getProducerId());
680            message.setDestination(md.getDestination());
681            if (message.getOriginalTransactionId() == null) {
682                message.setOriginalTransactionId(message.getTransactionId());
683            }
684            message.setTransactionId(null);
685            if (configuration.isUseCompression()) {
686                message.compress();
687            }
688            return message;
689        }
690    
691        protected void serviceLocalCommand(Command command) {
692            if (!disposed.get()) {
693                try {
694                    if (command.isMessageDispatch()) {
695                        safeWaitUntilStarted();
696                        enqueueCounter.incrementAndGet();
697                        final MessageDispatch md = (MessageDispatch) command;
698                        final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
699                        if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
700    
701                            if (suppressMessageDispatch(md, sub)) {
702                                if (LOG.isDebugEnabled()) {
703                                    LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + md.getMessage());
704                                }
705                                // still ack as it may be durable
706                                try {
707                                    localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
708                                } finally {
709                                    sub.decrementOutstandingResponses();
710                                }
711                                return;
712                            }
713    
714                            Message message = configureMessage(md);
715                            if (LOG.isDebugEnabled()) {
716                                LOG.debug("bridging (" + configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + (LOG.isTraceEnabled() ? message : message.getMessageId()) + ", consumer: " + md.getConsumerId() + ", destination " + message.getDestination() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
717                            }
718    
719                            if (!configuration.isAlwaysSyncSend() && !message.isPersistent()) {
720    
721                                // If the message was originally sent using async
722                                // send, we will preserve that QOS
723                                // by bridging it using an async send (small chance
724                                // of message loss).
725                                try {
726                                    remoteBroker.oneway(message);
727                                    localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
728                                    dequeueCounter.incrementAndGet();
729                                } finally {
730                                    sub.decrementOutstandingResponses();
731                                }
732    
733                            } else {
734    
735                                // The message was not sent using async send, so we
736                                // should only ack the local
737                                // broker when we get confirmation that the remote
738                                // broker has received the message.
739                                ResponseCallback callback = new ResponseCallback() {
740                                    public void onCompletion(FutureResponse future) {
741                                        try {
742                                            Response response = future.getResult();
743                                            if (response.isException()) {
744                                                ExceptionResponse er = (ExceptionResponse) response;
745                                                serviceLocalException(er.getException());
746                                            } else {
747                                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
748                                                dequeueCounter.incrementAndGet();
749                                            }
750                                        } catch (IOException e) {
751                                            serviceLocalException(e);
752                                        } finally {
753                                            sub.decrementOutstandingResponses();
754                                        }
755                                    }
756                                };
757    
758                                remoteBroker.asyncRequest(message, callback);
759    
760                            }
761                        } else {
762                            if (LOG.isDebugEnabled()) {
763                                LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
764                            }
765                        }
766                    } else if (command.isBrokerInfo()) {
767                        localBrokerInfo = (BrokerInfo) command;
768                        serviceLocalBrokerInfo(command);
769                    } else if (command.isShutdownInfo()) {
770                        LOG.info(configuration.getBrokerName() + " Shutting down");
771                        stop();
772                    } else if (command.getClass() == ConnectionError.class) {
773                        ConnectionError ce = (ConnectionError) command;
774                        serviceLocalException(ce.getException());
775                    } else {
776                        switch (command.getDataStructureType()) {
777                        case WireFormatInfo.DATA_STRUCTURE_TYPE:
778                            break;
779                        default:
780                            LOG.warn("Unexpected local command: " + command);
781                        }
782                    }
783                } catch (Throwable e) {
784                    LOG.warn("Caught an exception processing local command", e);
785                    serviceLocalException(e);
786                }
787            }
788        }
789    
790        private boolean suppressMessageDispatch(MessageDispatch md, DemandSubscription sub) throws Exception {
791            boolean suppress = false;
792            // for durable subs, suppression via filter leaves dangling acks so we need to
793            // check here and allow the ack irrespective
794            if (sub.getLocalInfo().isDurable()) {
795                MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
796                messageEvalContext.setMessageReference(md.getMessage());
797                messageEvalContext.setDestination(md.getDestination());
798                suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext);
799            }
800            return suppress;
801        }
802    
803        /**
804         * @return Returns the dynamicallyIncludedDestinations.
805         */
806        public ActiveMQDestination[] getDynamicallyIncludedDestinations() {
807            return dynamicallyIncludedDestinations;
808        }
809    
810        /**
811         * @param dynamicallyIncludedDestinations The
812         *            dynamicallyIncludedDestinations to set.
813         */
814        public void setDynamicallyIncludedDestinations(ActiveMQDestination[] dynamicallyIncludedDestinations) {
815            this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
816        }
817    
818        /**
819         * @return Returns the excludedDestinations.
820         */
821        public ActiveMQDestination[] getExcludedDestinations() {
822            return excludedDestinations;
823        }
824    
825        /**
826         * @param excludedDestinations The excludedDestinations to set.
827         */
828        public void setExcludedDestinations(ActiveMQDestination[] excludedDestinations) {
829            this.excludedDestinations = excludedDestinations;
830        }
831    
832        /**
833         * @return Returns the staticallyIncludedDestinations.
834         */
835        public ActiveMQDestination[] getStaticallyIncludedDestinations() {
836            return staticallyIncludedDestinations;
837        }
838    
839        /**
840         * @param staticallyIncludedDestinations The staticallyIncludedDestinations
841         *            to set.
842         */
843        public void setStaticallyIncludedDestinations(ActiveMQDestination[] staticallyIncludedDestinations) {
844            this.staticallyIncludedDestinations = staticallyIncludedDestinations;
845        }
846    
847        /**
848         * @return Returns the durableDestinations.
849         */
850        public ActiveMQDestination[] getDurableDestinations() {
851            return durableDestinations;
852        }
853    
854        /**
855         * @param durableDestinations The durableDestinations to set.
856         */
857        public void setDurableDestinations(ActiveMQDestination[] durableDestinations) {
858            this.durableDestinations = durableDestinations;
859        }
860    
861        /**
862         * @return Returns the localBroker.
863         */
864        public Transport getLocalBroker() {
865            return localBroker;
866        }
867    
868        /**
869         * @return Returns the remoteBroker.
870         */
871        public Transport getRemoteBroker() {
872            return remoteBroker;
873        }
874    
875        /**
876         * @return the createdByDuplex
877         */
878        public boolean isCreatedByDuplex() {
879            return this.createdByDuplex;
880        }
881    
882        /**
883         * @param createdByDuplex the createdByDuplex to set
884         */
885        public void setCreatedByDuplex(boolean createdByDuplex) {
886            this.createdByDuplex = createdByDuplex;
887        }
888    
889        public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
890            if (brokerPath != null) {
891                for (int i = 0; i < brokerPath.length; i++) {
892                    if (brokerId.equals(brokerPath[i])) {
893                        return true;
894                    }
895                }
896            }
897            return false;
898        }
899    
900        protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
901            if (brokerPath == null || brokerPath.length == 0) {
902                return pathsToAppend;
903            }
904            BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
905            System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
906            System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
907            return rc;
908        }
909    
910        protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
911            if (brokerPath == null || brokerPath.length == 0) {
912                return new BrokerId[] { idToAppend };
913            }
914            BrokerId rc[] = new BrokerId[brokerPath.length + 1];
915            System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
916            rc[brokerPath.length] = idToAppend;
917            return rc;
918        }
919    
920        protected boolean isPermissableDestination(ActiveMQDestination destination) {
921            return isPermissableDestination(destination, false);
922        }
923    
924        protected boolean isPermissableDestination(ActiveMQDestination destination, boolean allowTemporary) {
925            // Are we not bridging temporary destinations?
926            if (destination.isTemporary()) {
927                if (allowTemporary) {
928                    return true;
929                } else {
930                    return configuration.isBridgeTempDestinations();
931                }
932            }
933    
934            ActiveMQDestination[] dests = staticallyIncludedDestinations;
935            if (dests != null && dests.length > 0) {
936                for (int i = 0; i < dests.length; i++) {
937                    ActiveMQDestination match = dests[i];
938                    DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
939                    if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
940                        return true;
941                    }
942                }
943            }
944    
945            dests = excludedDestinations;
946            if (dests != null && dests.length > 0) {
947                for (int i = 0; i < dests.length; i++) {
948                    ActiveMQDestination match = dests[i];
949                    DestinationFilter exclusionFilter = DestinationFilter.parseFilter(match);
950                    if (match != null && exclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
951                        return false;
952                    }
953                }
954            }
955    
956            dests = dynamicallyIncludedDestinations;
957            if (dests != null && dests.length > 0) {
958                for (int i = 0; i < dests.length; i++) {
959                    ActiveMQDestination match = dests[i];
960                    DestinationFilter inclusionFilter = DestinationFilter.parseFilter(match);
961                    if (match != null && inclusionFilter.matches(destination) && dests[i].getDestinationType() == destination.getDestinationType()) {
962                        return true;
963                    }
964                }
965    
966                return false;
967            }
968            return true;
969        }
970    
971        /**
972         * Subscriptions for these destinations are always created
973         */
974        protected void setupStaticDestinations() {
975            ActiveMQDestination[] dests = staticallyIncludedDestinations;
976            if (dests != null) {
977                for (int i = 0; i < dests.length; i++) {
978                    ActiveMQDestination dest = dests[i];
979                    DemandSubscription sub = createDemandSubscription(dest);
980                    try {
981                        addSubscription(sub);
982                    } catch (IOException e) {
983                        LOG.error("Failed to add static destination " + dest, e);
984                    }
985                    if (LOG.isTraceEnabled()) {
986                        LOG.trace("bridging messages for static destination: " + dest);
987                    }
988                }
989            }
990        }
991    
992        protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
993            boolean consumerAdded = false;
994            ConsumerInfo info = consumerInfo.copy();
995            addRemoteBrokerToBrokerPath(info);
996            DemandSubscription sub = createDemandSubscription(info);
997            if (sub != null) {
998                if (duplicateSuppressionIsRequired(sub)) {
999                    undoMapRegistration(sub);
1000                } else {
1001                    addSubscription(sub);
1002                    consumerAdded = true;
1003                }
1004            }
1005            return consumerAdded;
1006        }
1007    
1008        private void undoMapRegistration(DemandSubscription sub) {
1009            subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
1010            subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
1011        }
1012    
1013        /*
1014         * check our existing subs networkConsumerIds against the list of network ids in this subscription
1015         * A match means a duplicate which we suppress for topics and maybe for queues
1016         */
1017        private boolean duplicateSuppressionIsRequired(DemandSubscription candidate) {
1018            final ConsumerInfo consumerInfo = candidate.getRemoteInfo();
1019            boolean suppress = false;
1020    
1021            if (consumerInfo.getDestination().isQueue() && !configuration.isSuppressDuplicateQueueSubscriptions() ||
1022                    consumerInfo.getDestination().isTopic() && !configuration.isSuppressDuplicateTopicSubscriptions()) {
1023                return suppress;
1024            }
1025    
1026            List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
1027            Collection<Subscription> currentSubs =
1028                getRegionSubscriptions(consumerInfo.getDestination());
1029            for (Subscription sub : currentSubs) {
1030                List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
1031                if (!networkConsumers.isEmpty()) {
1032                    if (matchFound(candidateConsumers, networkConsumers)) {
1033                        if (isInActiveDurableSub(sub)) {
1034                            suppress = false;
1035                        } else {
1036                            suppress = hasLowerPriority(sub, candidate.getLocalInfo());
1037                        }
1038                        break;
1039                    }
1040                }
1041            }
1042            return suppress;
1043        }
1044    
1045        private boolean isInActiveDurableSub(Subscription sub) {
1046            return  (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && !((DurableTopicSubscription)sub).isActive());
1047        }
1048    
1049        private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
1050            boolean suppress = false;
1051    
1052            if (existingSub.getConsumerInfo().getPriority() >= candidateInfo.getPriority()) {
1053                if (LOG.isDebugEnabled()) {
1054                    LOG.debug(configuration.getBrokerName() + " Ignoring duplicate subscription from " + remoteBrokerName
1055                            + ", sub: " + candidateInfo + " is duplicated by network subscription with equal or higher network priority: "
1056                            + existingSub  + ", networkConsumerIds: " + existingSub.getConsumerInfo().getNetworkConsumerIds());
1057                }
1058                suppress = true;
1059            } else {
1060                // remove the existing lower priority duplicate and allow this candidate
1061                try {
1062                    removeDuplicateSubscription(existingSub);
1063    
1064                    if (LOG.isDebugEnabled()) {
1065                        LOG.debug(configuration.getBrokerName() + " Replacing duplicate subscription " + existingSub.getConsumerInfo()
1066                                + " with sub from " + remoteBrokerName
1067                                + ", which has a higher priority, new sub: " + candidateInfo + ", networkComsumerIds: "
1068                                + candidateInfo.getNetworkConsumerIds());
1069                    }
1070                } catch (IOException e) {
1071                    LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e);
1072                }
1073            }
1074            return suppress;
1075        }
1076    
1077        private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
1078            for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
1079                if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
1080                    break;
1081                }
1082            }
1083        }
1084    
1085        private boolean matchFound(List<ConsumerId> candidateConsumers, List<ConsumerId> networkConsumers) {
1086            boolean found = false;
1087            for (ConsumerId aliasConsumer : networkConsumers) {
1088                if (candidateConsumers.contains(aliasConsumer)) {
1089                    found = true;
1090                    break;
1091                }
1092            }
1093            return found;
1094        }
1095    
1096        private final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
1097            RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
1098            Region region;
1099            Collection<Subscription> subs;
1100    
1101            region = null;
1102            switch ( dest.getDestinationType() )
1103            {
1104                case ActiveMQDestination.QUEUE_TYPE:
1105                    region = region_broker.getQueueRegion();
1106                    break;
1107    
1108                case ActiveMQDestination.TOPIC_TYPE:
1109                    region = region_broker.getTopicRegion();
1110                    break;
1111    
1112                case ActiveMQDestination.TEMP_QUEUE_TYPE:
1113                    region = region_broker.getTempQueueRegion();
1114                    break;
1115    
1116                case ActiveMQDestination.TEMP_TOPIC_TYPE:
1117                    region = region_broker.getTempTopicRegion();
1118                    break;
1119            }
1120    
1121            if ( region instanceof AbstractRegion ) {
1122                subs = ((AbstractRegion) region).getSubscriptions().values();
1123            } else {
1124                subs = null;
1125            }
1126    
1127            return subs;
1128        }
1129    
1130        protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
1131            //add our original id to ourselves
1132            info.addNetworkConsumerId(info.getConsumerId());
1133            return doCreateDemandSubscription(info);
1134        }
1135    
1136        protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
1137            DemandSubscription result = new DemandSubscription(info);
1138            result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1139            if (info.getDestination().isTemporary()) {
1140                // reset the local connection Id
1141                ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
1142                dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
1143            }
1144    
1145            if (configuration.isDecreaseNetworkConsumerPriority()) {
1146                byte priority = (byte) configuration.getConsumerPriorityBase();
1147                if (info.getBrokerPath() != null && info.getBrokerPath().length > 1) {
1148                    // The longer the path to the consumer, the less it's consumer priority.
1149                    priority -= info.getBrokerPath().length + 1;
1150                }
1151                result.getLocalInfo().setPriority(priority);
1152                if (LOG.isDebugEnabled()) {
1153                    LOG.debug(configuration.getBrokerName() + " using priority :" + priority + " for subscription: " + info);
1154                }
1155            }
1156            configureDemandSubscription(info, result);
1157            return result;
1158        }
1159    
1160        final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
1161            ConsumerInfo info = new ConsumerInfo();
1162            info.setDestination(destination);
1163    
1164            // the remote info held by the DemandSubscription holds the original consumerId,
1165            // the local info get's overwritten
1166            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
1167            DemandSubscription result = null;
1168            try {
1169                result = createDemandSubscription(info);
1170            } catch (IOException e) {
1171                LOG.error("Failed to create DemandSubscription ", e);
1172            }
1173            return result;
1174        }
1175    
1176        protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException {
1177            sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
1178            sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
1179            subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
1180            subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
1181    
1182            sub.setNetworkBridgeFilter(createNetworkBridgeFilter(info));
1183            if (!info.isDurable()) {
1184                // This works for now since we use a VM connection to the local broker.
1185                // may need to change if we ever subscribe to a remote broker.
1186                sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
1187            } else  {
1188                // need to ack this message if it is ignored as it is durable so
1189                // we check before we send. see: suppressMessageDispatch()
1190            }
1191        }
1192    
1193        protected void removeDemandSubscription(ConsumerId id) throws IOException {
1194            DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
1195            if (LOG.isDebugEnabled()) {
1196                LOG.debug(configuration.getBrokerName() + " remove request on " + localBroker + " from " + remoteBrokerName + " , consumer id: " + id + ", matching sub: " + sub);
1197            }
1198            if (sub != null) {
1199                removeSubscription(sub);
1200                if (LOG.isDebugEnabled()) {
1201                    LOG.debug(configuration.getBrokerName() + " removed sub on " + localBroker + " from " + remoteBrokerName + " :  " + sub.getRemoteInfo());
1202                }
1203            }
1204        }
1205    
1206        protected boolean removeDemandSubscriptionByLocalId(ConsumerId consumerId) {
1207            boolean removeDone = false;
1208            DemandSubscription sub = subscriptionMapByLocalId.get(consumerId);
1209            if (sub != null) {
1210                try {
1211                    removeDemandSubscription(sub.getRemoteInfo().getConsumerId());
1212                    removeDone = true;
1213                } catch (IOException e) {
1214                    LOG.debug("removeDemandSubscriptionByLocalId failed for localId: " + consumerId, e);
1215                }
1216            }
1217            return removeDone;
1218        }
1219    
1220        /**
1221         * Performs a timed wait on the started latch and then checks for disposed before performing
1222         * another wait each time the the started wait times out.
1223         *
1224         * @throws InterruptedException
1225         */
1226        protected void safeWaitUntilStarted() throws InterruptedException {
1227            while (!disposed.get()) {
1228                if (startedLatch.await(1, TimeUnit.SECONDS)) {
1229                    return;
1230                }
1231            }
1232        }
1233    
1234        protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
1235            NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
1236            if (brokerService != null && brokerService.getDestinationPolicy() != null) {
1237                PolicyEntry entry = brokerService.getDestinationPolicy().getEntryFor(info.getDestination());
1238                if (entry != null && entry.getNetworkBridgeFilterFactory() != null) {
1239                    filterFactory = entry.getNetworkBridgeFilterFactory();
1240                }
1241            }
1242            return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL());
1243        }
1244    
1245        protected void serviceLocalBrokerInfo(Command command) throws InterruptedException {
1246            synchronized (brokerInfoMutex) {
1247                if (remoteBrokerId != null) {
1248                    if (remoteBrokerId.equals(localBrokerId)) {
1249                        if (LOG.isTraceEnabled()) {
1250                            LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
1251                        }
1252                        safeWaitUntilStarted();
1253                        ServiceSupport.dispose(this);
1254                    }
1255                }
1256            }
1257        }
1258    
1259        protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {
1260            info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(), getRemoteBrokerPath()));
1261        }
1262    
1263        protected void serviceRemoteBrokerInfo(Command command) throws IOException {
1264            synchronized (brokerInfoMutex) {
1265                BrokerInfo remoteBrokerInfo = (BrokerInfo)command;
1266                remoteBrokerId = remoteBrokerInfo.getBrokerId();
1267                remoteBrokerPath[0] = remoteBrokerId;
1268                remoteBrokerName = remoteBrokerInfo.getBrokerName();
1269                if (localBrokerId != null) {
1270                    if (localBrokerId.equals(remoteBrokerId)) {
1271                        if (LOG.isTraceEnabled()) {
1272                            LOG.trace(configuration.getBrokerName() + " disconnecting remote loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
1273                        }
1274                        ServiceSupport.dispose(this);
1275                    }
1276                }
1277                if (!disposed.get()) {
1278                    triggerLocalStartBridge();
1279                }
1280            }
1281        }
1282    
1283        protected  BrokerId[] getRemoteBrokerPath() {
1284            return remoteBrokerPath;
1285        }
1286    
1287        public void setNetworkBridgeListener(NetworkBridgeListener listener) {
1288            this.networkBridgeListener = listener;
1289        }
1290    
1291        private void fireBridgeFailed() {
1292            NetworkBridgeListener l = this.networkBridgeListener;
1293            if (l != null) {
1294                l.bridgeFailed();
1295            }
1296        }
1297    
1298        public String getRemoteAddress() {
1299            return remoteBroker.getRemoteAddress();
1300        }
1301    
1302        public String getLocalAddress() {
1303            return localBroker.getRemoteAddress();
1304        }
1305    
1306        public String getRemoteBrokerName() {
1307            return remoteBrokerInfo == null ? null : remoteBrokerInfo.getBrokerName();
1308        }
1309    
1310        public String getLocalBrokerName() {
1311            return localBrokerInfo == null ? null : localBrokerInfo.getBrokerName();
1312        }
1313    
1314        public long getDequeueCounter() {
1315            return dequeueCounter.get();
1316        }
1317    
1318        public long getEnqueueCounter() {
1319            return enqueueCounter.get();
1320        }
1321    
1322        protected boolean isDuplex() {
1323            return configuration.isDuplex() || createdByDuplex;
1324        }
1325    
1326        public ConcurrentHashMap<ConsumerId, DemandSubscription> getLocalSubscriptionMap() {
1327            return subscriptionMapByRemoteId;
1328        }
1329    
1330        public void setBrokerService(BrokerService brokerService) {
1331            this.brokerService = brokerService;
1332            this.localBrokerId = brokerService.getRegionBroker().getBrokerId();
1333            localBrokerPath[0] = localBrokerId;
1334        }
1335    
1336        public void setMbeanObjectName(ObjectName objectName) {
1337            this.mbeanObjectName = objectName;
1338        }
1339    
1340        public ObjectName getMbeanObjectName() {
1341            return mbeanObjectName;
1342        }
1343    }