001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.util.List;
020    
021    import org.apache.activemq.broker.region.Subscription;
022    import org.apache.activemq.command.BrokerId;
023    import org.apache.activemq.command.ConsumerInfo;
024    import org.apache.activemq.command.Message;
025    import org.apache.activemq.command.NetworkBridgeFilter;
026    import org.apache.activemq.filter.MessageEvaluationContext;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    /**
031     * implement conditional behavior for queue consumers, allows replaying back to
032     * origin if no consumers are present on the local broker after a configurable
033     * delay, irrespective of the networkTTL Also allows rate limiting of messages
034     * through the network, useful for static includes
035     *
036     * @org.apache.xbean.XBean
037     */
038    public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
039    
040        boolean replayWhenNoConsumers = false;
041        int replayDelay = 0;
042        int rateLimit = 0;
043        int rateDuration = 1000;
044    
045        @Override
046        public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) {
047            ConditionalNetworkBridgeFilter filter = new ConditionalNetworkBridgeFilter();
048            filter.setNetworkBrokerId(remoteBrokerPath[0]);
049            filter.setNetworkTTL(networkTimeToLive);
050            filter.setAllowReplayWhenNoConsumers(isReplayWhenNoConsumers());
051            filter.setRateLimit(getRateLimit());
052            filter.setRateDuration(getRateDuration());
053            filter.setReplayDelay(getReplayDelay());
054            return filter;
055        }
056    
057        public void setReplayWhenNoConsumers(boolean replayWhenNoConsumers) {
058            this.replayWhenNoConsumers = replayWhenNoConsumers;
059        }
060    
061        public boolean isReplayWhenNoConsumers() {
062            return replayWhenNoConsumers;
063        }
064    
065        public void setRateLimit(int rateLimit) {
066            this.rateLimit = rateLimit;
067        }
068    
069        public int getRateLimit() {
070            return rateLimit;
071        }
072    
073        public int getRateDuration() {
074            return rateDuration;
075        }
076    
077        public void setRateDuration(int rateDuration) {
078            this.rateDuration = rateDuration;
079        }
080    
081        public int getReplayDelay() {
082            return replayDelay;
083        }
084    
085        public void setReplayDelay(int replayDelay) {
086            this.replayDelay = replayDelay;
087        }
088    
089        private static class ConditionalNetworkBridgeFilter extends NetworkBridgeFilter {
090            final static Logger LOG = LoggerFactory.getLogger(ConditionalNetworkBridgeFilter.class);
091            private int rateLimit;
092            private int rateDuration = 1000;
093            private boolean allowReplayWhenNoConsumers = true;
094            private int replayDelay = 1000;
095    
096            private int matchCount;
097            private long rateDurationEnd;
098    
099            @Override
100            protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) {
101                boolean match = true;
102                if (mec.getDestination().isQueue()) {
103                    if (contains(message.getBrokerPath(), networkBrokerId)) {
104                        // potential replay back to origin
105                        match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
106    
107                        if (match && LOG.isTraceEnabled()) {
108                            LOG.trace("Replaying  [" + message.getMessageId() + "] for [" + message.getDestination()
109                                    + "] back to origin in the absence of a local consumer");
110                        }
111                    }
112    
113                    if (match && rateLimitExceeded()) {
114                        if (LOG.isTraceEnabled()) {
115                            LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount
116                                    + ">" + rateLimit + "/" + rateDuration);
117                        }
118                        match = false;
119                    }
120    
121                } else {
122                    // use existing logic for topics
123                    match = super.matchesForwardingFilter(message, mec);
124                }
125    
126                return match;
127            }
128    
129            private boolean hasNotJustArrived(Message message) {
130                return replayDelay == 0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis());
131            }
132    
133            private boolean hasNoLocalConsumers(final Message message, final MessageEvaluationContext mec) {
134                List<Subscription> consumers = mec.getMessageReference().getRegionDestination().getConsumers();
135                for (Subscription sub : consumers) {
136                    if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) {
137                        if (LOG.isTraceEnabled()) {
138                            LOG.trace("Not replaying [" + message.getMessageId() + "] for [" + message.getDestination()
139                                    + "] to origin due to existing local consumer: " + sub.getConsumerInfo());
140                        }
141                        return false;
142                    }
143                }
144                return true;
145            }
146    
147            private boolean rateLimitExceeded() {
148                if (rateLimit == 0) {
149                    return false;
150                }
151    
152                if (rateDurationEnd < System.currentTimeMillis()) {
153                    rateDurationEnd = System.currentTimeMillis() + rateDuration;
154                    matchCount = 0;
155                }
156                return ++matchCount > rateLimit;
157            }
158    
159            public void setReplayDelay(int replayDelay) {
160                this.replayDelay = replayDelay;
161            }
162    
163            public void setRateLimit(int rateLimit) {
164                this.rateLimit = rateLimit;
165            }
166    
167            public void setRateDuration(int rateDuration) {
168                this.rateDuration = rateDuration;
169            }
170    
171            public void setAllowReplayWhenNoConsumers(boolean allowReplayWhenNoConsumers) {
172                this.allowReplayWhenNoConsumers = allowReplayWhenNoConsumers;
173            }
174        }
175    }