001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.util;
018
019 import java.io.IOException;
020 import org.apache.activemq.RedeliveryPolicy;
021 import org.apache.activemq.ScheduledMessage;
022 import org.apache.activemq.broker.Broker;
023 import org.apache.activemq.broker.BrokerPluginSupport;
024 import org.apache.activemq.broker.ConnectionContext;
025 import org.apache.activemq.broker.ProducerBrokerExchange;
026 import org.apache.activemq.broker.region.MessageReference;
027 import org.apache.activemq.broker.region.Subscription;
028 import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
029 import org.apache.activemq.command.ActiveMQDestination;
030 import org.apache.activemq.command.ActiveMQQueue;
031 import org.apache.activemq.command.ActiveMQTopic;
032 import org.apache.activemq.command.Message;
033 import org.apache.activemq.command.ProducerInfo;
034 import org.apache.activemq.filter.AnyDestination;
035 import org.apache.activemq.state.ProducerState;
036 import org.apache.activemq.util.BrokerSupport;
037 import org.slf4j.Logger;
038 import org.slf4j.LoggerFactory;
039
040 /**
041 * Replace regular DLQ handling with redelivery via a resend to the original destination
042 * after a delay
043 * A destination matching RedeliveryPolicy controls the quantity and delay for re-sends
044 * If there is no matching policy or an existing policy limit is exceeded by default
045 * regular DLQ processing resumes. This is controlled via sendToDlqIfMaxRetriesExceeded
046 * and fallbackToDeadLetter
047 *
048 * @org.apache.xbean.XBean element="redeliveryPlugin"
049 */
050 public class RedeliveryPlugin extends BrokerPluginSupport {
051 private static final Logger LOG = LoggerFactory.getLogger(RedeliveryPlugin.class);
052 public static final String REDELIVERY_DELAY = "redeliveryDelay";
053
054 RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
055 boolean sendToDlqIfMaxRetriesExceeded = true;
056 private boolean fallbackToDeadLetter = true;
057
058 @Override
059 public Broker installPlugin(Broker broker) throws Exception {
060 if (!broker.getBrokerService().isSchedulerSupport()) {
061 throw new IllegalStateException("RedeliveryPlugin requires schedulerSupport=true on the broker");
062 }
063 validatePolicyDelay(1000);
064 return super.installPlugin(broker);
065 }
066
067 /*
068 * sending to dlq is called as part of a poison ack processing, before the message is acknowledged and removed
069 * by the destination so a delay is vital to avoid resending before it has been consumed
070 */
071 private void validatePolicyDelay(long limit) {
072 final ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
073 for (Object entry : redeliveryPolicyMap.get(matchAll)) {
074 RedeliveryPolicy redeliveryPolicy = (RedeliveryPolicy) entry;
075 validateLimit(limit, redeliveryPolicy);
076 }
077 RedeliveryPolicy defaultEntry = redeliveryPolicyMap.getDefaultEntry();
078 if (defaultEntry != null) {
079 validateLimit(limit, defaultEntry);
080 }
081 }
082
083 private void validateLimit(long limit, RedeliveryPolicy redeliveryPolicy) {
084 if (redeliveryPolicy.getInitialRedeliveryDelay() < limit) {
085 throw new IllegalStateException("RedeliveryPolicy initialRedeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy);
086 }
087 if (redeliveryPolicy.getRedeliveryDelay() < limit) {
088 throw new IllegalStateException("RedeliveryPolicy redeliveryDelay must exceed: " + limit + ". " + redeliveryPolicy);
089 }
090 }
091
092 public RedeliveryPolicyMap getRedeliveryPolicyMap() {
093 return redeliveryPolicyMap;
094 }
095
096 public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
097 this.redeliveryPolicyMap = redeliveryPolicyMap;
098 }
099
100 public boolean isSendToDlqIfMaxRetriesExceeded() {
101 return sendToDlqIfMaxRetriesExceeded;
102 }
103
104 /**
105 * What to do if the maxretries on a matching redelivery policy is exceeded.
106 * when true, the region broker DLQ processing will be used via sendToDeadLetterQueue
107 * when false, there is no action
108 * @param sendToDlqIfMaxRetriesExceeded
109 */
110 public void setSendToDlqIfMaxRetriesExceeded(boolean sendToDlqIfMaxRetriesExceeded) {
111 this.sendToDlqIfMaxRetriesExceeded = sendToDlqIfMaxRetriesExceeded;
112 }
113
114 public boolean isFallbackToDeadLetter() {
115 return fallbackToDeadLetter;
116 }
117
118 /**
119 * What to do if there is no matching redelivery policy for a destination.
120 * when true, the region broker DLQ processing will be used via sendToDeadLetterQueue
121 * when false, there is no action
122 * @param fallbackToDeadLetter
123 */
124 public void setFallbackToDeadLetter(boolean fallbackToDeadLetter) {
125 this.fallbackToDeadLetter = fallbackToDeadLetter;
126 }
127
128 @Override
129 public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
130 if (next.get().isExpired(messageReference)) {
131 // there are two uses of sendToDeadLetterQueue, we are only interested in valid messages
132 super.sendToDeadLetterQueue(context, messageReference, subscription);
133 } else {
134 try {
135 final RedeliveryPolicy redeliveryPolicy = redeliveryPolicyMap.getEntryFor(messageReference.getRegionDestination().getActiveMQDestination());
136 if (redeliveryPolicy != null) {
137 int redeliveryCount = messageReference.getRedeliveryCounter();
138 if (redeliveryCount < redeliveryPolicy.getMaximumRedeliveries()) {
139
140 long delay = ( redeliveryCount == 0 ?
141 redeliveryPolicy.getInitialRedeliveryDelay() :
142 redeliveryPolicy.getNextRedeliveryDelay(getExistingDelay(messageReference)));
143
144 scheduleRedelivery(context, messageReference, delay, ++redeliveryCount);
145 } else if (isSendToDlqIfMaxRetriesExceeded()) {
146 super.sendToDeadLetterQueue(context, messageReference, subscription);
147 } else {
148 LOG.debug("Discarding message that exceeds max redelivery count, " + messageReference.getMessageId());
149 }
150 } else if (isFallbackToDeadLetter()) {
151 super.sendToDeadLetterQueue(context, messageReference, subscription);
152 } else {
153 LOG.debug("Ignoring dlq request for:" + messageReference.getMessageId() + ", RedeliveryPolicy not found (and no fallback) for: " + messageReference.getRegionDestination().getActiveMQDestination());
154 }
155 } catch (Exception exception) {
156 // abort the ack, will be effective if client use transactions or individual ack with sync send
157 RuntimeException toThrow = new RuntimeException("Failed to schedule redelivery for: " + messageReference.getMessageId(), exception);
158 LOG.error(toThrow.toString(), exception);
159 throw toThrow;
160 }
161 }
162 }
163
164 private void scheduleRedelivery(ConnectionContext context, MessageReference messageReference, long delay, int redeliveryCount) throws Exception {
165 if (LOG.isTraceEnabled()) {
166 LOG.trace("redelivery #" + redeliveryCount + " of: " + messageReference.getMessageId() + " with delay: "
167 + delay + ", dest: " + messageReference.getRegionDestination().getActiveMQDestination());
168 }
169 final Message old = messageReference.getMessage();
170 Message message = old.copy();
171
172 message.setTransactionId(null);
173 message.setMemoryUsage(null);
174 message.setMarshalledProperties(null);
175 message.removeProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
176
177 message.setProperty(REDELIVERY_DELAY, delay);
178 message.setProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
179 message.setRedeliveryCounter(redeliveryCount);
180
181 boolean originalFlowControl = context.isProducerFlowControl();
182 try {
183 context.setProducerFlowControl(false);
184 ProducerInfo info = new ProducerInfo();
185 ProducerState state = new ProducerState(info);
186 ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
187 producerExchange.setProducerState(state);
188 producerExchange.setMutable(true);
189 producerExchange.setConnectionContext(context);
190 context.getBroker().send(producerExchange, message);
191 } finally {
192 context.setProducerFlowControl(originalFlowControl);
193 }
194 }
195
196 private int getExistingDelay(MessageReference messageReference) throws IOException {
197 Object val = messageReference.getMessage().getProperty(REDELIVERY_DELAY);
198 if (val instanceof Long) {
199 return ((Long)val).intValue();
200 }
201 return 0;
202 }
203
204 }