001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.policy;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.List;
022    
023    import org.apache.activemq.broker.region.MessageReference;
024    import org.apache.activemq.broker.region.Subscription;
025    import org.apache.activemq.command.ConsumerId;
026    import org.apache.activemq.command.ConsumerInfo;
027    import org.apache.activemq.filter.MessageEvaluationContext;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * dispatch policy that ignores lower priority duplicate network consumers,
033     * used in conjunction with network bridge suppresDuplicateTopicSubscriptions
034     * 
035     * @org.apache.xbean.XBean
036     */
037    public class PriorityNetworkDispatchPolicy extends SimpleDispatchPolicy {
038    
039        private static final Logger LOG = LoggerFactory.getLogger(PriorityNetworkDispatchPolicy.class);
040        @Override
041        public boolean dispatch(MessageReference node,
042                MessageEvaluationContext msgContext,
043                List<Subscription> consumers) throws Exception {
044            
045            List<Subscription> duplicateFreeSubs = new ArrayList<Subscription>();
046            synchronized (consumers) {
047                for (Subscription sub: consumers) {
048                    ConsumerInfo info = sub.getConsumerInfo();
049                    if (info.isNetworkSubscription()) {    
050                        boolean highestPrioritySub = true;
051                        for (Iterator<Subscription> it =  duplicateFreeSubs.iterator(); it.hasNext(); ) {
052                            Subscription candidate = it.next();
053                            if (matches(candidate, info)) {
054                                if (hasLowerPriority(candidate, info)) {
055                                    it.remove();
056                                } else {
057                                    // higher priority matching sub exists
058                                    highestPrioritySub = false;
059                                    if (LOG.isDebugEnabled()) {
060                                    LOG.debug("ignoring lower priority: " + candidate 
061                                            + "[" +candidate.getConsumerInfo().getNetworkConsumerIds() +", "
062                                            + candidate.getConsumerInfo().getNetworkConsumerIds() +"] in favour of: " 
063                                            + sub
064                                            + "[" +sub.getConsumerInfo().getNetworkConsumerIds() +", "
065                                            + sub.getConsumerInfo().getNetworkConsumerIds() +"]");
066                                    }
067                                }
068                            }
069                        }
070                        if (highestPrioritySub) {
071                            duplicateFreeSubs.add(sub);
072                        } 
073                    } else {
074                        duplicateFreeSubs.add(sub);
075                    }
076                }
077            }
078            
079            return super.dispatch(node, msgContext, duplicateFreeSubs);
080        }
081    
082        private boolean hasLowerPriority(Subscription candidate,
083                ConsumerInfo info) {
084           return candidate.getConsumerInfo().getPriority() < info.getPriority();
085        }
086    
087        private boolean matches(Subscription candidate, ConsumerInfo info) {
088            boolean matched = false;
089            for (ConsumerId candidateId: candidate.getConsumerInfo().getNetworkConsumerIds()) {
090                for (ConsumerId subId: info.getNetworkConsumerIds()) {
091                    if (candidateId.equals(subId)) {
092                        matched = true;
093                        break;
094                    }
095                }
096            }
097            return matched;
098        }
099    
100    }