001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import org.apache.activemq.filter.BooleanExpression;
020    import org.apache.activemq.filter.MessageEvaluationContext;
021    import org.apache.activemq.util.JMSExceptionSupport;
022    import org.slf4j.Logger;
023    import org.slf4j.LoggerFactory;
024    
025    import javax.jms.JMSException;
026    import java.io.IOException;
027    import java.util.Arrays;
028    
029    /**
030     * @openwire:marshaller code="91"
031     * 
032     */
033    public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
034    
035        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
036        static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class);
037    
038        protected BrokerId networkBrokerId;
039        protected int networkTTL;
040        transient ConsumerInfo consumerInfo;
041    
042        public NetworkBridgeFilter() {
043        }
044    
045        public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int networkTTL) {
046            this.networkBrokerId = networkBrokerId;
047            this.networkTTL = networkTTL;
048            this.consumerInfo = consumerInfo;
049        }
050    
051        public byte getDataStructureType() {
052            return DATA_STRUCTURE_TYPE;
053        }
054    
055        public boolean isMarshallAware() {
056            return false;
057        }
058    
059        public boolean matches(MessageEvaluationContext mec) throws JMSException {
060            try {
061                // for Queues - the message can be acknowledged and dropped whilst
062                // still
063                // in the dispatch loop
064                // so need to get the reference to it
065                Message message = mec.getMessage();
066                return message != null && matchesForwardingFilter(message, mec);
067            } catch (IOException e) {
068                throw JMSExceptionSupport.create(e);
069            }
070        }
071    
072        public Object evaluate(MessageEvaluationContext message) throws JMSException {
073            return matches(message) ? Boolean.TRUE : Boolean.FALSE;
074        }
075    
076        protected boolean matchesForwardingFilter(Message message, MessageEvaluationContext mec) {
077    
078            if (contains(message.getBrokerPath(), networkBrokerId)) {
079                if (LOG.isTraceEnabled()) {
080                    LOG.trace("Message all ready routed once through target broker ("
081                            + networkBrokerId + "), path: "
082                            + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
083                }
084                return false;
085            }
086    
087            int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
088    
089            if (hops >= networkTTL) {
090                if (LOG.isTraceEnabled()) {
091                    LOG.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message);
092                }
093                return false;
094            }
095    
096            if (message.isAdvisory()) {
097                if (consumerInfo != null && consumerInfo.isNetworkSubscription()) {
098                    // they will be interpreted by the bridge leading to dup commands
099                    if (LOG.isTraceEnabled()) {
100                        LOG.trace("not propagating advisory to network sub: " + consumerInfo.getConsumerId() + ", message: "+ message);
101                    }
102                    return false;
103                } else if ( message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
104                    ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
105                    hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
106                    if (hops >= networkTTL) {
107                        if (LOG.isTraceEnabled()) {
108                            LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
109                        }
110                        return false;
111                    }
112    
113                    if (contains(info.getBrokerPath(), networkBrokerId)) {
114                        LOG.trace("ConsumerInfo advisory all ready routed once through target broker ("
115                                + networkBrokerId + "), path: "
116                                + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message);
117                        return false;
118                    }
119                }
120            }
121            return true;
122        }
123    
124        public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
125            if (brokerPath != null && brokerId != null) {
126                for (int i = 0; i < brokerPath.length; i++) {
127                    if (brokerId.equals(brokerPath[i])) {
128                        return true;
129                    }
130                }
131            }
132            return false;
133        }
134    
135        /**
136         * @openwire:property version=1
137         */
138        public int getNetworkTTL() {
139            return networkTTL;
140        }
141    
142        public void setNetworkTTL(int networkTTL) {
143            this.networkTTL = networkTTL;
144        }
145    
146        /**
147         * @openwire:property version=1 cache=true
148         */
149        public BrokerId getNetworkBrokerId() {
150            return networkBrokerId;
151        }
152    
153        public void setNetworkBrokerId(BrokerId remoteBrokerPath) {
154            this.networkBrokerId = remoteBrokerPath;
155        }
156    
157    }