001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import org.apache.activemq.advisory.AdvisorySupport;
020import org.apache.activemq.filter.BooleanExpression;
021import org.apache.activemq.filter.MessageEvaluationContext;
022import org.apache.activemq.util.JMSExceptionSupport;
023import org.slf4j.Logger;
024import org.slf4j.LoggerFactory;
025
026import javax.jms.JMSException;
027import java.io.IOException;
028import java.util.Arrays;
029
030/**
031 * @openwire:marshaller code="91"
032 *
033 */
034public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
035
036    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.NETWORK_BRIDGE_FILTER;
037    static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class);
038
039    protected BrokerId networkBrokerId;
040    protected int messageTTL;
041    protected int consumerTTL;
042    transient ConsumerInfo consumerInfo;
043
044    public NetworkBridgeFilter() {
045    }
046
047    public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int messageTTL, int consumerTTL) {
048        this.networkBrokerId = networkBrokerId;
049        this.messageTTL = messageTTL;
050        this.consumerTTL = consumerTTL;
051        this.consumerInfo = consumerInfo;
052    }
053
054    @Override
055    public byte getDataStructureType() {
056        return DATA_STRUCTURE_TYPE;
057    }
058
059    @Override
060    public boolean isMarshallAware() {
061        return false;
062    }
063
064    @Override
065    public boolean matches(MessageEvaluationContext mec) throws JMSException {
066        try {
067            // for Queues - the message can be acknowledged and dropped whilst
068            // still
069            // in the dispatch loop
070            // so need to get the reference to it
071            Message message = mec.getMessage();
072            return message != null && matchesForwardingFilter(message, mec);
073        } catch (IOException e) {
074            throw JMSExceptionSupport.create(e);
075        }
076    }
077
078    @Override
079    public Object evaluate(MessageEvaluationContext message) throws JMSException {
080        return matches(message) ? Boolean.TRUE : Boolean.FALSE;
081    }
082
083    protected boolean matchesForwardingFilter(Message message, MessageEvaluationContext mec) {
084
085        if (contains(message.getBrokerPath(), networkBrokerId)) {
086            if (LOG.isTraceEnabled()) {
087                LOG.trace("Message all ready routed once through target broker ("
088                        + networkBrokerId + "), path: "
089                        + Arrays.toString(message.getBrokerPath()) + " - ignoring: " + message);
090            }
091            return false;
092        }
093
094        int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
095
096        if (messageTTL > -1 && hops >= messageTTL) {
097            if (LOG.isTraceEnabled()) {
098                LOG.trace("Message restricted to " + messageTTL + " network hops ignoring: " + message);
099            }
100            return false;
101        }
102
103        if (message.isAdvisory()) {
104            if (consumerInfo != null && consumerInfo.isNetworkSubscription() && isAdvisoryInterpretedByNetworkBridge(message)) {
105                // they will be interpreted by the bridge leading to dup commands
106                if (LOG.isTraceEnabled()) {
107                    LOG.trace("not propagating advisory to network sub: " + consumerInfo.getConsumerId() + ", message: "+ message);
108                }
109                return false;
110            } else if ( message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
111                ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
112                hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
113                if (consumerTTL > -1 && hops >= consumerTTL) {
114                    if (LOG.isTraceEnabled()) {
115                        LOG.trace("ConsumerInfo advisory restricted to " + consumerTTL + " network hops ignoring: " + message);
116                    }
117                    return false;
118                }
119
120                if (contains(info.getBrokerPath(), networkBrokerId)) {
121                    LOG.trace("ConsumerInfo advisory all ready routed once through target broker ("
122                            + networkBrokerId + "), path: "
123                            + Arrays.toString(info.getBrokerPath()) + " - ignoring: " + message);
124                    return false;
125                }
126            }
127        }
128        return true;
129    }
130
131    public static boolean isAdvisoryInterpretedByNetworkBridge(Message message) {
132        return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) ||
133                AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(message.getDestination()) ||
134                AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination());
135    }
136
137    public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
138        if (brokerPath != null && brokerId != null) {
139            for (int i = 0; i < brokerPath.length; i++) {
140                if (brokerId.equals(brokerPath[i])) {
141                    return true;
142                }
143            }
144        }
145        return false;
146    }
147
148    // keep for backward compat with older
149    // wire formats
150    public int getNetworkTTL() {
151        return messageTTL;
152    }
153
154    public void setNetworkTTL(int networkTTL) {
155        messageTTL = networkTTL;
156        consumerTTL = networkTTL;
157    }
158
159    /**
160     * @openwire:property version=1 cache=true
161     */
162    public BrokerId getNetworkBrokerId() {
163        return networkBrokerId;
164    }
165
166    public void setNetworkBrokerId(BrokerId remoteBrokerPath) {
167        this.networkBrokerId = remoteBrokerPath;
168    }
169
170    public void setMessageTTL(int messageTTL) {
171        this.messageTTL = messageTTL;
172    }
173
174    /**
175     * @openwire:property version=10
176     */
177    public int getMessageTTL() {
178        return this.messageTTL;
179    }
180
181    public void setConsumerTTL(int consumerTTL) {
182        this.consumerTTL = consumerTTL;
183    }
184
185    /**
186     * @openwire:property version=10
187     */
188    public int getConsumerTTL() {
189        return this.consumerTTL;
190    }
191}