001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.util;
018
019import java.io.IOException;
020
021import org.apache.activemq.RedeliveryPolicy;
022import org.apache.activemq.ScheduledMessage;
023import org.apache.activemq.broker.Broker;
024import org.apache.activemq.broker.BrokerPluginSupport;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.ProducerBrokerExchange;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.MessageReference;
029import org.apache.activemq.broker.region.Subscription;
030import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
031import org.apache.activemq.command.ActiveMQDestination;
032import org.apache.activemq.command.ActiveMQQueue;
033import org.apache.activemq.command.ActiveMQTopic;
034import org.apache.activemq.command.Message;
035import org.apache.activemq.command.ProducerInfo;
036import org.apache.activemq.filter.AnyDestination;
037import org.apache.activemq.state.ProducerState;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * Replace regular DLQ handling with redelivery via a resend to the original destination
043 * after a delay
044 * A destination matching RedeliveryPolicy controls the quantity and delay for re-sends
045 * If there is no matching policy or an existing policy limit is exceeded by default
046 * regular DLQ processing resumes. This is controlled via sendToDlqIfMaxRetriesExceeded
047 * and fallbackToDeadLetter
048 *
049 * @org.apache.xbean.XBean element="redeliveryPlugin"
050 */
051public class RedeliveryPlugin extends BrokerPluginSupport {
052    private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPlugin.class);
053    public static final String REDELIVERY_DELAY = "redeliveryDelay";
054
055    RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
056    boolean sendToDlqIfMaxRetriesExceeded = true;
057    private boolean fallbackToDeadLetter = true;
058
059    @Override
060    public Broker installPlugin(Broker broker) throws Exception {
061        if (!broker.getBrokerService().isSchedulerSupport()) {
062            throw new IllegalStateException("RedeliveryPlugin requires schedulerSupport=true on the broker");
063        }
064        validatePolicyDelay(1000);
065        return super.installPlugin(broker);
066    }
067
068    /*
069     * sending to dlq is called as part of a poison ack processing, before the message is acknowledged  and removed
070     * by the destination so a delay is vital to avoid resending before it has been consumed
071     */
072    private void validatePolicyDelay(long limit) {
073        final ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
074        for (Object entry : redeliveryPolicyMap.get(matchAll)) {
075            RedeliveryPolicy redeliveryPolicy = (RedeliveryPolicy) entry;
076            validateLimit(limit, redeliveryPolicy);
077        }
078        RedeliveryPolicy defaultEntry = redeliveryPolicyMap.getDefaultEntry();
079        if (defaultEntry != null) {
080            validateLimit(limit, defaultEntry);
081        }
082    }
083
084    private void validateLimit(long limit, RedeliveryPolicy redeliveryPolicy) {
085        if (redeliveryPolicy.getInitialRedeliveryDelay() < limit) {
086            throw new IllegalStateException("RedeliveryPolicy initialRedeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy);
087        }
088        if (redeliveryPolicy.getRedeliveryDelay() < limit) {
089            throw new IllegalStateException("RedeliveryPolicy redeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy);
090        }
091    }
092
093    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
094        return redeliveryPolicyMap;
095    }
096
097    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
098        this.redeliveryPolicyMap = redeliveryPolicyMap;
099    }
100
101    public boolean isSendToDlqIfMaxRetriesExceeded() {
102        return sendToDlqIfMaxRetriesExceeded;
103    }
104
105    /**
106     * What to do if the maxretries on a matching redelivery policy is exceeded.
107     * when true, the region broker DLQ processing will be used via sendToDeadLetterQueue
108     * when false, there is no action
109     * @param sendToDlqIfMaxRetriesExceeded
110     */
111    public void setSendToDlqIfMaxRetriesExceeded(boolean sendToDlqIfMaxRetriesExceeded) {
112        this.sendToDlqIfMaxRetriesExceeded = sendToDlqIfMaxRetriesExceeded;
113    }
114
115    public boolean isFallbackToDeadLetter() {
116        return fallbackToDeadLetter;
117    }
118
119    /**
120     * What to do if there is no matching redelivery policy for a destination.
121     * when true, the region broker DLQ processing will be used via sendToDeadLetterQueue
122     * when false, there is no action
123     * @param fallbackToDeadLetter
124     */
125    public void setFallbackToDeadLetter(boolean fallbackToDeadLetter) {
126        this.fallbackToDeadLetter = fallbackToDeadLetter;
127    }
128
129    @Override
130    public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
131        if (messageReference.isExpired()) {
132            // there are two uses of  sendToDeadLetterQueue, we are only interested in valid messages
133            return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
134        } else {
135            try {
136                Destination regionDestination = (Destination) messageReference.getRegionDestination();
137                final RedeliveryPolicy redeliveryPolicy = redeliveryPolicyMap.getEntryFor(regionDestination.getActiveMQDestination());
138                if (redeliveryPolicy != null) {
139                    final int maximumRedeliveries = redeliveryPolicy.getMaximumRedeliveries();
140                    int redeliveryCount = messageReference.getRedeliveryCounter();
141                    if (RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES == maximumRedeliveries || redeliveryCount < maximumRedeliveries) {
142
143                        long delay = redeliveryPolicy.getInitialRedeliveryDelay();
144                        for (int i = 0; i < redeliveryCount; i++) {
145                            delay = redeliveryPolicy.getNextRedeliveryDelay(delay);
146                        }
147
148                        scheduleRedelivery(context, messageReference, delay, ++redeliveryCount);
149                    } else if (isSendToDlqIfMaxRetriesExceeded()) {
150                        return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
151                    } else {
152                        LOG.debug("Discarding message that exceeds max redelivery count({}), {}", maximumRedeliveries, messageReference.getMessageId());
153                    }
154                } else if (isFallbackToDeadLetter()) {
155                    return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
156                } else {
157                    LOG.debug("Ignoring dlq request for: {}, RedeliveryPolicy not found (and no fallback) for: {}", messageReference.getMessageId(), regionDestination.getActiveMQDestination());
158                }
159
160                return false;
161            } catch (Exception exception) {
162                // abort the ack, will be effective if client use transactions or individual ack with sync send
163                RuntimeException toThrow =  new RuntimeException("Failed to schedule redelivery for: " + messageReference.getMessageId(), exception);
164                LOG.error(toThrow.toString(), exception);
165                throw toThrow;
166            }
167        }
168    }
169
170    private void scheduleRedelivery(ConnectionContext context, MessageReference messageReference, long delay, int redeliveryCount) throws Exception {
171        if (LOG.isTraceEnabled()) {
172            Destination regionDestination = (Destination) messageReference.getRegionDestination();
173            LOG.trace("redelivery #{} of: {} with delay: {}, dest: {}", new Object[]{
174                    redeliveryCount, messageReference.getMessageId(), delay, regionDestination.getActiveMQDestination()
175            });
176        }
177        final Message old = messageReference.getMessage();
178        Message message = old.copy();
179
180        message.setTransactionId(null);
181        message.setMemoryUsage(null);
182        message.removeProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
183
184        message.setProperty(REDELIVERY_DELAY, delay);
185        message.setProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
186        message.setRedeliveryCounter(redeliveryCount);
187
188        boolean originalFlowControl = context.isProducerFlowControl();
189        try {
190            context.setProducerFlowControl(false);
191            ProducerInfo info = new ProducerInfo();
192            ProducerState state = new ProducerState(info);
193            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
194            producerExchange.setProducerState(state);
195            producerExchange.setMutable(true);
196            producerExchange.setConnectionContext(context);
197            context.getBroker().send(producerExchange, message);
198        } finally {
199            context.setProducerFlowControl(originalFlowControl);
200        }
201    }
202
203}