001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.network;
018    
019    import java.util.List;
020    
021    import org.apache.activemq.broker.region.Destination;
022    import org.apache.activemq.broker.region.Subscription;
023    import org.apache.activemq.command.BrokerId;
024    import org.apache.activemq.command.ConsumerInfo;
025    import org.apache.activemq.command.Message;
026    import org.apache.activemq.command.NetworkBridgeFilter;
027    import org.apache.activemq.filter.MessageEvaluationContext;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * implement conditional behavior for queue consumers, allows replaying back to
033     * origin if no consumers are present on the local broker after a configurable
034     * delay, irrespective of the networkTTL Also allows rate limiting of messages
035     * through the network, useful for static includes
036     *
037     * @org.apache.xbean.XBean
038     */
039    public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
040    
041        boolean replayWhenNoConsumers = false;
042        int replayDelay = 0;
043        int rateLimit = 0;
044        int rateDuration = 1000;
045    
046        @Override
047        public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) {
048            ConditionalNetworkBridgeFilter filter = new ConditionalNetworkBridgeFilter();
049            filter.setNetworkBrokerId(remoteBrokerPath[0]);
050            filter.setNetworkTTL(networkTimeToLive);
051            filter.setAllowReplayWhenNoConsumers(isReplayWhenNoConsumers());
052            filter.setRateLimit(getRateLimit());
053            filter.setRateDuration(getRateDuration());
054            filter.setReplayDelay(getReplayDelay());
055            return filter;
056        }
057    
058        public void setReplayWhenNoConsumers(boolean replayWhenNoConsumers) {
059            this.replayWhenNoConsumers = replayWhenNoConsumers;
060        }
061    
062        public boolean isReplayWhenNoConsumers() {
063            return replayWhenNoConsumers;
064        }
065    
066        public void setRateLimit(int rateLimit) {
067            this.rateLimit = rateLimit;
068        }
069    
070        public int getRateLimit() {
071            return rateLimit;
072        }
073    
074        public int getRateDuration() {
075            return rateDuration;
076        }
077    
078        public void setRateDuration(int rateDuration) {
079            this.rateDuration = rateDuration;
080        }
081    
082        public int getReplayDelay() {
083            return replayDelay;
084        }
085    
086        public void setReplayDelay(int replayDelay) {
087            this.replayDelay = replayDelay;
088        }
089    
090        private static class ConditionalNetworkBridgeFilter extends NetworkBridgeFilter {
091            final static Logger LOG = LoggerFactory.getLogger(ConditionalNetworkBridgeFilter.class);
092            private int rateLimit;
093            private int rateDuration = 1000;
094            private boolean allowReplayWhenNoConsumers = true;
095            private int replayDelay = 1000;
096    
097            private int matchCount;
098            private long rateDurationEnd;
099    
100            @Override
101            protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) {
102                boolean match = true;
103                if (mec.getDestination().isQueue() && contains(message.getBrokerPath(), networkBrokerId)) {
104                    // potential replay back to origin
105                    match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
106    
107                    if (match && LOG.isTraceEnabled()) {
108                        LOG.trace("Replaying  [" + message.getMessageId() + "] for [" + message.getDestination()
109                                + "] back to origin in the absence of a local consumer");
110                    }
111    
112                } else {
113                    // use existing filter logic for topics and non replays
114                    match = super.matchesForwardingFilter(message, mec);
115                }
116    
117                if (match && rateLimitExceeded()) {
118                    if (LOG.isTraceEnabled()) {
119                        LOG.trace("Throttled network consumer rejecting [" + message.getMessageId() + "] for [" + message.getDestination() + " " + matchCount
120                                + ">" + rateLimit + "/" + rateDuration);
121                    }
122                    match = false;
123                }
124    
125                return match;
126            }
127    
128            private boolean hasNotJustArrived(Message message) {
129                return replayDelay == 0 || (message.getBrokerInTime() + replayDelay < System.currentTimeMillis());
130            }
131    
132            private boolean hasNoLocalConsumers(final Message message, final MessageEvaluationContext mec) {
133                Destination regionDestination = (Destination) mec.getMessageReference().getRegionDestination();
134                List<Subscription> consumers = regionDestination.getConsumers();
135                for (Subscription sub : consumers) {
136                    if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) {
137                        if (LOG.isTraceEnabled()) {
138                            LOG.trace("Not replaying [" + message.getMessageId() + "] for [" + message.getDestination()
139                                    + "] to origin due to existing local consumer: " + sub.getConsumerInfo());
140                        }
141                        return false;
142                    }
143                }
144                return true;
145            }
146    
147            private boolean rateLimitExceeded() {
148                if (rateLimit == 0) {
149                    return false;
150                }
151    
152                if (rateDurationEnd < System.currentTimeMillis()) {
153                    rateDurationEnd = System.currentTimeMillis() + rateDuration;
154                    matchCount = 0;
155                }
156                return ++matchCount > rateLimit;
157            }
158    
159            public void setReplayDelay(int replayDelay) {
160                this.replayDelay = replayDelay;
161            }
162    
163            public void setRateLimit(int rateLimit) {
164                this.rateLimit = rateLimit;
165            }
166    
167            public void setRateDuration(int rateDuration) {
168                this.rateDuration = rateDuration;
169            }
170    
171            public void setAllowReplayWhenNoConsumers(boolean allowReplayWhenNoConsumers) {
172                this.allowReplayWhenNoConsumers = allowReplayWhenNoConsumers;
173            }
174        }
175    }