001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.Arrays;
022    import java.util.List;
023    
024    import org.apache.activemq.command.BrokerId;
025    import org.apache.activemq.command.ConsumerId;
026    import org.apache.activemq.command.ConsumerInfo;
027    import org.apache.activemq.filter.DestinationFilter;
028    import org.apache.activemq.transport.Transport;
029    import org.slf4j.Logger;
030    import org.slf4j.LoggerFactory;
031    
032    /**
033     * Consolidates subscriptions
034     */
035    public class ConduitBridge extends DemandForwardingBridge {
036        private static final Logger LOG = LoggerFactory.getLogger(ConduitBridge.class);
037    
038        /**
039         * Constructor
040         *
041         * @param localBroker
042         * @param remoteBroker
043         */
044        public ConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
045            super(configuration, localBroker, remoteBroker);
046        }
047    
048        @Override
049        protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
050            if (addToAlreadyInterestedConsumers(info)) {
051                return null; // don't want this subscription added
052            }
053            //add our original id to ourselves
054            info.addNetworkConsumerId(info.getConsumerId());
055            info.setSelector(null);
056            return doCreateDemandSubscription(info);
057        }
058    
059        protected boolean checkPaths(BrokerId[] first, BrokerId[] second) {
060            if (first == null || second == null) {
061                return true;
062            }
063            if (Arrays.equals(first, second)) {
064                return true;
065            }
066    
067            if (first[0].equals(second[0]) && first[first.length - 1].equals(second[second.length - 1])) {
068                return false;
069            } else {
070                return true;
071            }
072        }
073    
074        protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
075            // search through existing subscriptions and see if we have a match
076            boolean matched = false;
077    
078            for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
079                DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
080                if (filter.matches(info.getDestination())) {
081                    if (LOG.isDebugEnabled()) {
082                        LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " +
083                                  ds.getRemoteInfo() + " with sub: " + info.getConsumerId());
084                    }
085                    // add the interest in the subscription
086                    if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) {
087                        ds.add(info.getConsumerId());
088                    }
089                    matched = true;
090                    // continue - we want interest to any existing DemandSubscriptions
091                }
092            }
093            return matched;
094        }
095    
096        @Override
097        protected void removeDemandSubscription(ConsumerId id) throws IOException {
098            List<DemandSubscription> tmpList = new ArrayList<DemandSubscription>();
099    
100            for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
101                if (ds.remove(id)) {
102                    if (LOG.isDebugEnabled()) {
103                        LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : sub: " + id  + " existing matched sub: " + ds.getRemoteInfo());
104                    }
105                }
106                if (ds.isEmpty()) {
107                    tmpList.add(ds);
108                }
109            }
110    
111            for (DemandSubscription ds : tmpList) {
112                removeSubscription(ds);
113                if (LOG.isDebugEnabled()) {
114                    LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " :  " + ds.getRemoteInfo());
115                }
116            }
117        }
118    }