001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.policy;
018
019import java.util.ArrayList;
020import java.util.HashMap;
021import java.util.List;
022import java.util.Map;
023import java.util.Map.Entry;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.atomic.AtomicBoolean;
026
027import org.apache.activemq.broker.Broker;
028import org.apache.activemq.broker.Connection;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.broker.region.Destination;
031import org.apache.activemq.broker.region.Subscription;
032import org.apache.activemq.command.ConsumerControl;
033import org.apache.activemq.command.RemoveInfo;
034import org.apache.activemq.state.CommandVisitor;
035import org.apache.activemq.thread.Scheduler;
036import org.apache.activemq.transport.InactivityIOException;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * Abort slow consumers when they reach the configured threshold of slowness, default is slow for 30 seconds
042 *
043 * @org.apache.xbean.XBean
044 */
045public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable {
046
047    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerStrategy.class);
048
049    protected String name = "AbortSlowConsumerStrategy@" + hashCode();
050    protected Scheduler scheduler;
051    protected Broker broker;
052    protected final AtomicBoolean taskStarted = new AtomicBoolean(false);
053    protected final Map<Subscription, SlowConsumerEntry> slowConsumers =
054        new ConcurrentHashMap<Subscription, SlowConsumerEntry>();
055
056    private long maxSlowCount = -1;
057    private long maxSlowDuration = 30*1000;
058    private long checkPeriod = 30*1000;
059    private boolean abortConnection = false;
060    private boolean ignoreNetworkConsumers = true;
061
062    @Override
063    public void setBrokerService(Broker broker) {
064       this.scheduler = broker.getScheduler();
065       this.broker = broker;
066    }
067
068    @Override
069    public void slowConsumer(ConnectionContext context, Subscription subs) {
070        if (maxSlowCount < 0 && maxSlowDuration < 0) {
071            // nothing to do
072            LOG.info("no limits set, slowConsumer strategy has nothing to do");
073            return;
074        }
075
076        if (taskStarted.compareAndSet(false, true)) {
077            scheduler.executePeriodically(this, checkPeriod);
078        }
079
080        if (!slowConsumers.containsKey(subs)) {
081            slowConsumers.put(subs, new SlowConsumerEntry(context));
082        } else if (maxSlowCount > 0) {
083            slowConsumers.get(subs).slow();
084        }
085    }
086
087    @Override
088    public void run() {
089        if (maxSlowDuration > 0) {
090            // mark
091            for (SlowConsumerEntry entry : slowConsumers.values()) {
092                entry.mark();
093            }
094        }
095
096        HashMap<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
097        for (Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) {
098            Subscription subscription = entry.getKey();
099            if (isIgnoreNetworkSubscriptions() && subscription.getConsumerInfo().isNetworkSubscription()) {
100                if (slowConsumers.remove(subscription) != null) {
101                    LOG.info("network sub: {} is no longer slow", subscription.getConsumerInfo().getConsumerId());
102                }
103                continue;
104            }
105
106            if (entry.getKey().isSlowConsumer()) {
107                if (maxSlowDuration > 0 && (entry.getValue().markCount * checkPeriod >= maxSlowDuration)
108                        || maxSlowCount > 0 && entry.getValue().slowCount >= maxSlowCount) {
109                    toAbort.put(entry.getKey(), entry.getValue());
110                    slowConsumers.remove(entry.getKey());
111                }
112            } else {
113                LOG.info("sub: " + entry.getKey().getConsumerInfo().getConsumerId() + " is no longer slow");
114                slowConsumers.remove(entry.getKey());
115            }
116        }
117
118        abortSubscription(toAbort, abortConnection);
119    }
120
121    protected void abortSubscription(Map<Subscription, SlowConsumerEntry> toAbort, boolean abortSubscriberConnection) {
122
123        Map<Connection, List<Subscription>> abortMap = new HashMap<Connection, List<Subscription>>();
124
125        for (final Entry<Subscription, SlowConsumerEntry> entry : toAbort.entrySet()) {
126            ConnectionContext connectionContext = entry.getValue().context;
127            if (connectionContext == null) {
128                continue;
129            }
130
131            Connection connection = connectionContext.getConnection();
132            if (connection == null) {
133                LOG.debug("slowConsumer abort ignored, no connection in context:"  + connectionContext);
134            }
135
136            if (!abortMap.containsKey(connection)) {
137                abortMap.put(connection, new ArrayList<Subscription>());
138            }
139
140            abortMap.get(connection).add(entry.getKey());
141        }
142
143        for (Entry<Connection, List<Subscription>> entry : abortMap.entrySet()) {
144            final Connection connection = entry.getKey();
145            final List<Subscription> subscriptions = entry.getValue();
146
147            if (abortSubscriberConnection) {
148
149                LOG.info("aborting connection:{} with {} slow consumers",
150                         connection.getConnectionId(), subscriptions.size());
151
152                if (LOG.isTraceEnabled()) {
153                    for (Subscription subscription : subscriptions) {
154                        LOG.trace("Connection {} being aborted because of slow consumer: {} on destination: {}",
155                                  new Object[] { connection.getConnectionId(),
156                                                 subscription.getConsumerInfo().getConsumerId(),
157                                                 subscription.getActiveMQDestination() });
158                    }
159                }
160
161                try {
162                    scheduler.executeAfterDelay(new Runnable() {
163                        @Override
164                        public void run() {
165                            connection.serviceException(new InactivityIOException(
166                                    subscriptions.size() + " Consumers was slow too often (>"
167                                    + maxSlowCount +  ") or too long (>"
168                                    + maxSlowDuration + "): "));
169                        }}, 0l);
170                } catch (Exception e) {
171                    LOG.info("exception on aborting connection {} with {} slow consumers",
172                             connection.getConnectionId(), subscriptions.size());
173                }
174            } else {
175                // just abort each consumer
176                for (Subscription subscription : subscriptions) {
177                    final Subscription subToClose = subscription;
178                    LOG.info("aborting slow consumer: {} for destination:{}",
179                             subscription.getConsumerInfo().getConsumerId(),
180                             subscription.getActiveMQDestination());
181
182                    // tell the remote consumer to close
183                    try {
184                        ConsumerControl stopConsumer = new ConsumerControl();
185                        stopConsumer.setConsumerId(subscription.getConsumerInfo().getConsumerId());
186                        stopConsumer.setClose(true);
187                        connection.dispatchAsync(stopConsumer);
188                    } catch (Exception e) {
189                        LOG.info("exception on aborting slow consumer: {}", subscription.getConsumerInfo().getConsumerId(), e);
190                    }
191
192                    // force a local remove in case remote is unresponsive
193                    try {
194                        scheduler.executeAfterDelay(new Runnable() {
195                            @Override
196                            public void run() {
197                                try {
198                                    RemoveInfo removeCommand = subToClose.getConsumerInfo().createRemoveCommand();
199                                    if (connection instanceof CommandVisitor) {
200                                        // avoid service exception handling and logging
201                                        removeCommand.visit((CommandVisitor) connection);
202                                    } else {
203                                        connection.service(removeCommand);
204                                    }
205                                } catch (IllegalStateException ignoredAsRemoteHasDoneTheJob) {
206                                } catch (Exception e) {
207                                    LOG.info("exception on local remove of slow consumer: {}", subToClose.getConsumerInfo().getConsumerId(), e);
208                                }
209                            }}, 1000l);
210
211                    } catch (Exception e) {
212                        LOG.info("exception on local remove of slow consumer: {}", subscription.getConsumerInfo().getConsumerId(), e);
213                    }
214                }
215            }
216        }
217    }
218
219    public void abortConsumer(Subscription sub, boolean abortSubscriberConnection) {
220        if (sub != null) {
221            SlowConsumerEntry entry = slowConsumers.remove(sub);
222            if (entry != null) {
223                Map<Subscription, SlowConsumerEntry> toAbort = new HashMap<Subscription, SlowConsumerEntry>();
224                toAbort.put(sub, entry);
225                abortSubscription(toAbort, abortSubscriberConnection);
226            } else {
227                LOG.warn("cannot abort subscription as it no longer exists in the map of slow consumers: " + sub);
228            }
229        }
230    }
231
232    public long getMaxSlowCount() {
233        return maxSlowCount;
234    }
235
236    /**
237     * number of times a subscription can be deemed slow before triggering abort
238     * effect depends on dispatch rate as slow determination is done on dispatch
239     */
240    public void setMaxSlowCount(long maxSlowCount) {
241        this.maxSlowCount = maxSlowCount;
242    }
243
244    public long getMaxSlowDuration() {
245        return maxSlowDuration;
246    }
247
248    /**
249     * time in milliseconds that a sub can remain slow before triggering
250     * an abort.
251     * @param maxSlowDuration
252     */
253    public void setMaxSlowDuration(long maxSlowDuration) {
254        this.maxSlowDuration = maxSlowDuration;
255    }
256
257    public long getCheckPeriod() {
258        return checkPeriod;
259    }
260
261    /**
262     * time in milliseconds between checks for slow subscriptions
263     * @param checkPeriod
264     */
265    public void setCheckPeriod(long checkPeriod) {
266        this.checkPeriod = checkPeriod;
267    }
268
269    public boolean isAbortConnection() {
270        return abortConnection;
271    }
272
273    /**
274     * abort the consumers connection rather than sending a stop command to the remote consumer
275     * @param abortConnection
276     */
277    public void setAbortConnection(boolean abortConnection) {
278        this.abortConnection = abortConnection;
279    }
280
281    /**
282     * Returns whether the strategy is configured to ignore subscriptions that are from a network
283     * connection.
284     *
285     * @return true if the strategy will ignore network connection subscriptions when looking
286     *         for slow consumers.
287     */
288    public boolean isIgnoreNetworkSubscriptions() {
289        return ignoreNetworkConsumers;
290    }
291
292    /**
293     * Sets whether the strategy is configured to ignore consumers that are part of a network
294     * connection to another broker.
295     *
296     * When configured to not ignore idle consumers this strategy acts not only on consumers
297     * that are actually slow but also on any consumer that has not received any messages for
298     * the maxTimeSinceLastAck.  This allows for a way to evict idle consumers while also
299     * aborting slow consumers however for a network subscription this can create a lot of
300     * unnecessary churn and if the abort connection option is also enabled this can result
301     * in the entire network connection being torn down and rebuilt for no reason.
302     *
303     * @param ignoreNetworkConsumers
304     *      Should this strategy ignore subscriptions made by a network connector.
305     */
306    public void setIgnoreNetworkConsumers(boolean ignoreNetworkConsumers) {
307        this.ignoreNetworkConsumers = ignoreNetworkConsumers;
308    }
309
310    public void setName(String name) {
311        this.name = name;
312    }
313
314    public String getName() {
315        return name;
316    }
317
318    public Map<Subscription, SlowConsumerEntry> getSlowConsumers() {
319        return slowConsumers;
320    }
321
322    @Override
323    public void addDestination(Destination destination) {
324        // Not needed for this strategy.
325    }
326}