001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.policy;
018    
019    import java.util.HashMap;
020    import java.util.Map;
021    import java.util.Map.Entry;
022    import java.util.concurrent.ConcurrentHashMap;
023    import java.util.concurrent.atomic.AtomicBoolean;
024    
025    import org.apache.activemq.broker.Broker;
026    import org.apache.activemq.broker.Connection;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.broker.region.Subscription;
029    import org.apache.activemq.command.ConsumerControl;
030    import org.apache.activemq.thread.Scheduler;
031    import org.apache.activemq.transport.InactivityIOException;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    /**
036     * Abort slow consumers when they reach the configured threshold of slowness, default is slow for 30 seconds
037     * 
038     * @org.apache.xbean.XBean
039     */
040    public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable {
041        
042        private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerStrategy.class);
043    
044        private String name = "AbortSlowConsumerStrategy@" + hashCode();
045        private Scheduler scheduler;
046        private Broker broker;
047        private final AtomicBoolean taskStarted = new AtomicBoolean(false);
048        private final Map<Subscription, SlowConsumerEntry> slowConsumers = new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
049    
050        private long maxSlowCount = -1;
051        private long maxSlowDuration = 30*1000;
052        private long checkPeriod = 30*1000;
053        private boolean abortConnection = false;
054    
055        public void setBrokerService(Broker broker) {
056           this.scheduler = broker.getScheduler();
057           this.broker = broker;
058        }
059    
060        public void slowConsumer(ConnectionContext context, Subscription subs) {
061            if (maxSlowCount < 0 && maxSlowDuration < 0) {
062                // nothing to do
063                LOG.info("no limits set, slowConsumer strategy has nothing to do");
064                return;
065            }
066            
067            if (taskStarted.compareAndSet(false, true)) {
068                scheduler.executePeriodically(this, checkPeriod);
069            }
070                
071            if (!slowConsumers.containsKey(subs)) {
072                slowConsumers.put(subs, new SlowConsumerEntry(context));
073            } else if (maxSlowCount > 0) {
074                slowConsumers.get(subs).slow();
075            }
076        }
077    
078        public void run() {
079            if (maxSlowDuration > 0) {
080                // mark
081                for (SlowConsumerEntry entry : slowConsumers.values()) {
082                    entry.mark();
083                }
084            }
085            
086            HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
087            for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
088                if (entry.getKey().isSlowConsumer()) {
089                    if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod > maxSlowDuration)
090                            || maxSlowCount > 0 && entry.getValue().slowCount > maxSlowCount) { 
091                        toAbort.put(entry.getKey(), entry.getValue());
092                        slowConsumers.remove(entry.getKey());
093                    }
094                } else {
095                    LOG.info("sub: " + entry.getKey().getConsumerInfo().getConsumerId() + " is no longer slow");
096                    slowConsumers.remove(entry.getKey());
097                }
098            }
099    
100            abortSubscription(toAbort, abortConnection);
101        }
102    
103        private void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) {
104            for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) {
105                ConnectionContext connectionContext = entry.getValue().context;
106                if (connectionContext!= null) {
107                    try {
108                        LOG.info("aborting "
109                                + (abortSubscriberConnection ? "connection" : "consumer") 
110                                + ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId());
111    
112                        final Connection connection = connectionContext.getConnection();
113                        if (connection != null) {
114                            if (abortSubscriberConnection) {
115                                scheduler.executeAfterDelay(new Runnable() {
116                                    public void run() {
117                                        connection.serviceException(new InactivityIOException("Consumer was slow too often (>"
118                                                + maxSlowCount +  ") or too long (>"
119                                                + maxSlowDuration + "): " + entry.getKey().getConsumerInfo().getConsumerId()));
120                                    }}, 0l);
121                            } else {
122                                // just abort the consumer by telling it to stop
123                                ConsumerControl stopConsumer = new ConsumerControl();
124                                stopConsumer.setConsumerId(entry.getKey().getConsumerInfo().getConsumerId());
125                                stopConsumer.setClose(true);
126                                connection.dispatchAsync(stopConsumer);
127                            }
128                        } else {
129                            LOG.debug("slowConsumer abort ignored, no connection in context:"  + connectionContext);
130                        }
131                    } catch (Exception e) {
132                        LOG.info("exception on stopping "
133                                + (abortSubscriberConnection ? "connection" : "consumer")
134                                + " to abort slow consumer: " + entry.getKey(), e);
135                    }
136                }
137            }
138        }
139    
140    
141        public void abortConsumer(Subscription sub, boolean abortSubscriberConnection) {
142            if (sub != null) {
143                SlowConsumerEntry entry = slowConsumers.remove(sub);
144                if (entry != null) {
145                    Map toAbort = new HashMap<Subscription, SlowConsumerEntry>();
146                    toAbort.put(sub, entry);
147                    abortSubscription(toAbort, abortSubscriberConnection);
148                } else {
149                    LOG.warn("cannot abort subscription as it no longer exists in the map of slow consumers: " + sub);
150                }
151            }
152        }
153    
154    
155        public long getMaxSlowCount() {
156            return maxSlowCount;
157        }
158    
159        /**
160         * number of times a subscription can be deemed slow before triggering abort
161         * effect depends on dispatch rate as slow determination is done on dispatch
162         */
163        public void setMaxSlowCount(long maxSlowCount) {
164            this.maxSlowCount = maxSlowCount;
165        }
166    
167        public long getMaxSlowDuration() {
168            return maxSlowDuration;
169        }
170    
171        /**
172         * time in milliseconds that a sub can remain slow before triggering
173         * an abort.
174         * @param maxSlowDuration
175         */
176        public void setMaxSlowDuration(long maxSlowDuration) {
177            this.maxSlowDuration = maxSlowDuration;
178        }
179    
180        public long getCheckPeriod() {
181            return checkPeriod;
182        }
183    
184        /**
185         * time in milliseconds between checks for slow subscriptions
186         * @param checkPeriod
187         */
188        public void setCheckPeriod(long checkPeriod) {
189            this.checkPeriod = checkPeriod;
190        }
191    
192        public boolean isAbortConnection() {
193            return abortConnection;
194        }
195    
196        /**
197         * abort the consumers connection rather than sending a stop command to the remote consumer
198         * @param abortConnection
199         */
200        public void setAbortConnection(boolean abortConnection) {
201            this.abortConnection = abortConnection;
202        }
203    
204        public void setName(String name) {
205            this.name = name;
206        }
207        
208        public String getName() {
209            return name;
210        }
211    
212        public Map<Subscription, SlowConsumerEntry> getSlowConsumers() {
213            return slowConsumers;
214        }
215    }