001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import org.apache.activemq.filter.BooleanExpression;
020    import org.apache.activemq.filter.MessageEvaluationContext;
021    import org.apache.activemq.util.JMSExceptionSupport;
022    import org.slf4j.Logger;
023    import org.slf4j.LoggerFactory;
024    
025    import javax.jms.JMSException;
026    import java.io.IOException;
027    import java.util.Arrays;
028    
029    /**
030     * @openwire:marshaller code="91"
031     *
032     */
033    public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
034    
035        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
036        static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class);
037    
038        protected BrokerId networkBrokerId;
039        protected int messageTTL;
040        protected int consumerTTL;
041        transient ConsumerInfo consumerInfo;
042    
043        public NetworkBridgeFilter() {
044        }
045    
046        public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int messageTTL, int consumerTTL) {
047            this.networkBrokerId = networkBrokerId;
048            this.messageTTL = messageTTL;
049            this.consumerTTL = consumerTTL;
050            this.consumerInfo = consumerInfo;
051        }
052    
053        public byte getDataStructureType() {
054            return DATA_STRUCTURE_TYPE;
055        }
056    
057        public boolean isMarshallAware() {
058            return false;
059        }
060    
061        public boolean matches(MessageEvaluationContext mec) throws JMSException {
062            try {
063                // for Queues - the message can be acknowledged and dropped whilst
064                // still
065                // in the dispatch loop
066                // so need to get the reference to it
067                Message message = mec.getMessage();
068                return message != null && matchesForwardingFilter(message, mec);
069            } catch (IOException e) {
070                throw JMSExceptionSupport.create(e);
071            }
072        }
073    
074        public Object evaluate(MessageEvaluationContext message) throws JMSException {
075            return matches(message) ? Boolean.TRUE : Boolean.FALSE;
076        }
077    
078        protected boolean matchesForwardingFilter(Message message, MessageEvaluationContext mec) {
079    
080            if (contains(message.getBrokerPath(), networkBrokerId)) {
081                if (LOG.isTraceEnabled()) {
082                    LOG.trace("Message all ready routed once through target broker ("
083                            + networkBrokerId + "), path: "
084                            + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
085                }
086                return false;
087            }
088    
089            int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
090    
091            if (messageTTL > -1 && hops >= messageTTL) {
092                if (LOG.isTraceEnabled()) {
093                    LOG.trace("Message restricted to " + messageTTL + " network hops ignoring: " + message);
094                }
095                return false;
096            }
097    
098            if (message.isAdvisory()) {
099                if (consumerInfo != null && consumerInfo.isNetworkSubscription()) {
100                    // they will be interpreted by the bridge leading to dup commands
101                    if (LOG.isTraceEnabled()) {
102                        LOG.trace("not propagating advisory to network sub: " + consumerInfo.getConsumerId() + ", message: "+ message);
103                    }
104                    return false;
105                } else if ( message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
106                    ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
107                    hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
108                    if (consumerTTL > -1 && hops >= consumerTTL) {
109                        if (LOG.isTraceEnabled()) {
110                            LOG.trace("ConsumerInfo advisory restricted to " + consumerTTL + " network hops ignoring: " + message);
111                        }
112                        return false;
113                    }
114    
115                    if (contains(info.getBrokerPath(), networkBrokerId)) {
116                        LOG.trace("ConsumerInfo advisory all ready routed once through target broker ("
117                                + networkBrokerId + "), path: "
118                                + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message);
119                        return false;
120                    }
121                }
122            }
123            return true;
124        }
125    
126        public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
127            if (brokerPath != null && brokerId != null) {
128                for (int i = 0; i < brokerPath.length; i++) {
129                    if (brokerId.equals(brokerPath[i])) {
130                        return true;
131                    }
132                }
133            }
134            return false;
135        }
136    
137        // keep for backward compat with older
138        // wire formats
139        public int getNetworkTTL() {
140            return messageTTL;
141        }
142    
143        public void setNetworkTTL(int networkTTL) {
144            messageTTL = networkTTL;
145            consumerTTL = networkTTL;
146        }
147    
148        /**
149         * @openwire:property version=1 cache=true
150         */
151        public BrokerId getNetworkBrokerId() {
152            return networkBrokerId;
153        }
154    
155        public void setNetworkBrokerId(BrokerId remoteBrokerPath) {
156            this.networkBrokerId = remoteBrokerPath;
157        }
158    
159        public void setMessageTTL(int messageTTL) {
160            this.messageTTL = messageTTL;
161        }
162    
163        /**
164         * @openwire:property version=10
165         */
166        public int getMessageTTL() {
167            return this.messageTTL;
168        }
169    
170        public void setConsumerTTL(int consumerTTL) {
171            this.consumerTTL = consumerTTL;
172        }
173    
174        /**
175         * @openwire:property version=10
176         */
177        public int getConsumerTTL() {
178            return this.consumerTTL;
179        }
180    }