001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.util;
018    
019    import java.io.IOException;
020    import org.apache.activemq.RedeliveryPolicy;
021    import org.apache.activemq.ScheduledMessage;
022    import org.apache.activemq.broker.Broker;
023    import org.apache.activemq.broker.BrokerPluginSupport;
024    import org.apache.activemq.broker.ConnectionContext;
025    import org.apache.activemq.broker.ProducerBrokerExchange;
026    import org.apache.activemq.broker.region.MessageReference;
027    import org.apache.activemq.broker.region.Subscription;
028    import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
029    import org.apache.activemq.command.ActiveMQDestination;
030    import org.apache.activemq.command.ActiveMQQueue;
031    import org.apache.activemq.command.ActiveMQTopic;
032    import org.apache.activemq.command.Message;
033    import org.apache.activemq.command.ProducerInfo;
034    import org.apache.activemq.filter.AnyDestination;
035    import org.apache.activemq.state.ProducerState;
036    import org.apache.activemq.util.BrokerSupport;
037    import org.slf4j.Logger;
038    import org.slf4j.LoggerFactory;
039    
040    /**
041     * Replace regular DLQ handling with redelivery via a resend to the original destination
042     * after a delay
043     * A destination matching RedeliveryPolicy controls the quantity and delay for re-sends
044     * If there is no matching policy or an existing policy limit is exceeded by default
045     * regular DLQ processing resumes. This is controlled via sendToDlqIfMaxRetriesExceeded
046     * and fallbackToDeadLetter
047     *
048     * @org.apache.xbean.XBean element="redeliveryPlugin"
049     */
050    public class RedeliveryPlugin extends BrokerPluginSupport {
051        private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPlugin.class);
052        public static final String REDELIVERY_DELAY = "redeliveryDelay";
053    
054        RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
055        boolean sendToDlqIfMaxRetriesExceeded = true;
056        private boolean fallbackToDeadLetter = true;
057    
058        @Override
059        public Broker installPlugin(Broker broker) throws Exception {
060            if (!broker.getBrokerService().isSchedulerSupport()) {
061                throw new IllegalStateException("RedeliveryPlugin requires schedulerSupport=true on the broker");
062            }
063            validatePolicyDelay(1000);
064            return super.installPlugin(broker);
065        }
066    
067        /*
068         * sending to dlq is called as part of a poison ack processing, before the message is acknowledged  and removed
069         * by the destination so a delay is vital to avoid resending before it has been consumed
070         */
071        private void validatePolicyDelay(long limit) {
072            final ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
073            for (Object entry : redeliveryPolicyMap.get(matchAll)) {
074                RedeliveryPolicy redeliveryPolicy = (RedeliveryPolicy) entry;
075                validateLimit(limit, redeliveryPolicy);
076            }
077            RedeliveryPolicy defaultEntry = redeliveryPolicyMap.getDefaultEntry();
078            if (defaultEntry != null) {
079                validateLimit(limit, defaultEntry);
080            }
081        }
082    
083        private void validateLimit(long limit, RedeliveryPolicy redeliveryPolicy) {
084            if (redeliveryPolicy.getInitialRedeliveryDelay() < limit) {
085                throw new IllegalStateException("RedeliveryPolicy initialRedeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy);
086            }
087            if (redeliveryPolicy.getRedeliveryDelay() < limit) {
088                throw new IllegalStateException("RedeliveryPolicy redeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy);
089            }
090        }
091    
092        public RedeliveryPolicyMap getRedeliveryPolicyMap() {
093            return redeliveryPolicyMap;
094        }
095    
096        public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
097            this.redeliveryPolicyMap = redeliveryPolicyMap;
098        }
099    
100        public boolean isSendToDlqIfMaxRetriesExceeded() {
101            return sendToDlqIfMaxRetriesExceeded;
102        }
103    
104        /**
105         * What to do if the maxretries on a matching redelivery policy is exceeded.
106         * when true, the region broker DLQ processing will be used via sendToDeadLetterQueue
107         * when false, there is no action
108         * @param sendToDlqIfMaxRetriesExceeded
109         */
110        public void setSendToDlqIfMaxRetriesExceeded(boolean sendToDlqIfMaxRetriesExceeded) {
111            this.sendToDlqIfMaxRetriesExceeded = sendToDlqIfMaxRetriesExceeded;
112        }
113    
114        public boolean isFallbackToDeadLetter() {
115            return fallbackToDeadLetter;
116        }
117    
118        /**
119         * What to do if there is no matching redelivery policy for a destination.
120         * when true, the region broker DLQ processing will be used via sendToDeadLetterQueue
121         * when false, there is no action
122         * @param fallbackToDeadLetter
123         */
124        public void setFallbackToDeadLetter(boolean fallbackToDeadLetter) {
125            this.fallbackToDeadLetter = fallbackToDeadLetter;
126        }
127    
128        @Override
129        public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
130            if (next.get().isExpired(messageReference)) {
131                // there are two uses of  sendToDeadLetterQueue, we are only interested in valid messages
132                super.sendToDeadLetterQueue(context, messageReference, subscription);
133            } else {
134                try {
135                    final RedeliveryPolicy redeliveryPolicy = redeliveryPolicyMap.getEntryFor(messageReference.getRegionDestination().getActiveMQDestination());
136                    if (redeliveryPolicy != null) {
137                        int redeliveryCount = messageReference.getRedeliveryCounter();
138                        if (redeliveryCount < redeliveryPolicy.getMaximumRedeliveries()) {
139    
140                            long delay = ( redeliveryCount == 0 ?
141                                    redeliveryPolicy.getInitialRedeliveryDelay() :
142                                    redeliveryPolicy.getNextRedeliveryDelay(getExistingDelay(messageReference)));
143    
144                            scheduleRedelivery(context, messageReference, delay, ++redeliveryCount);
145                        } else if (isSendToDlqIfMaxRetriesExceeded()) {
146                            super.sendToDeadLetterQueue(context, messageReference, subscription);
147                        } else {
148                            LOG.debug("Discarding message that exceeds max redelivery count, " + messageReference.getMessageId());
149                        }
150                    } else if (isFallbackToDeadLetter()) {
151                        super.sendToDeadLetterQueue(context, messageReference, subscription);
152                    } else {
153                        LOG.debug("Ignoring dlq request for:" + messageReference.getMessageId()  + ", RedeliveryPolicy not found (and no fallback) for: " + messageReference.getRegionDestination().getActiveMQDestination());
154                    }
155                } catch (Exception exception) {
156                    // abort the ack, will be effective if client use transactions or individual ack with sync send
157                    RuntimeException toThrow =  new RuntimeException("Failed to schedule redelivery for: " + messageReference.getMessageId(), exception);
158                    LOG.error(toThrow.toString(), exception);
159                    throw toThrow;
160                }
161            }
162        }
163    
164        private void scheduleRedelivery(ConnectionContext context, MessageReference messageReference, long delay, int redeliveryCount) throws Exception {
165            if (LOG.isTraceEnabled()) {
166                LOG.trace("redelivery #" + redeliveryCount + " of: " + messageReference.getMessageId() + " with delay: "
167                        + delay + ", dest: " + messageReference.getRegionDestination().getActiveMQDestination());
168            }
169            final Message old = messageReference.getMessage();
170            Message message = old.copy();
171    
172            message.setTransactionId(null);
173            message.setMemoryUsage(null);
174            message.setMarshalledProperties(null);
175            message.removeProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
176    
177            message.setProperty(REDELIVERY_DELAY, delay);
178            message.setProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
179            message.setRedeliveryCounter(redeliveryCount);
180    
181            boolean originalFlowControl = context.isProducerFlowControl();
182            try {
183                context.setProducerFlowControl(false);
184                ProducerInfo info = new ProducerInfo();
185                ProducerState state = new ProducerState(info);
186                ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
187                producerExchange.setProducerState(state);
188                producerExchange.setMutable(true);
189                producerExchange.setConnectionContext(context);
190                context.getBroker().send(producerExchange, message);
191            } finally {
192                context.setProducerFlowControl(originalFlowControl);
193            }
194        }
195    
196        private int getExistingDelay(MessageReference messageReference) throws IOException {
197            Object val = messageReference.getMessage().getProperty(REDELIVERY_DELAY);
198            if (val instanceof Long) {
199                return ((Long)val).intValue();
200            }
201            return 0;
202        }
203    
204    }