001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.policy;
018    
019    import org.apache.activemq.ActiveMQMessageAudit;
020    import org.apache.activemq.broker.region.MessageReference;
021    import org.apache.activemq.broker.region.Subscription;
022    import org.apache.activemq.command.ActiveMQDestination;
023    import org.apache.activemq.command.Message;
024    import org.slf4j.Logger;
025    import org.slf4j.LoggerFactory;
026    
027    /**
028     * A strategy for choosing which destination is used for dead letter queue
029     * messages.
030     * 
031     * 
032     */
033    public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
034        private static final Logger LOG = LoggerFactory.getLogger(AbstractDeadLetterStrategy.class);
035        private boolean processNonPersistent = false;
036        private boolean processExpired = true;
037        private boolean enableAudit = true;
038        private ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit();
039    
040        public boolean isSendToDeadLetterQueue(Message message) {
041            boolean result = false;
042            if (message != null) {
043                result = true;
044                if (enableAudit && messageAudit.isDuplicate(message)) {
045                    result = false;
046                    if (LOG.isDebugEnabled()) {
047                        LOG.debug("Not adding duplicate to DLQ: " + message.getMessageId() + ", dest: " + message.getDestination());
048                    }
049                }
050                if (!message.isPersistent() && !processNonPersistent) {
051                    result = false;
052                }
053                if (message.isExpired() && !processExpired) {
054                    result = false;
055                }
056            }
057            return result;
058        }
059    
060        /**
061         * @return the processExpired
062         */
063        public boolean isProcessExpired() {
064            return this.processExpired;
065        }
066    
067        /**
068         * @param processExpired the processExpired to set
069         */
070        public void setProcessExpired(boolean processExpired) {
071            this.processExpired = processExpired;
072        }
073    
074        /**
075         * @return the processNonPersistent
076         */
077        public boolean isProcessNonPersistent() {
078            return this.processNonPersistent;
079        }
080    
081        /**
082         * @param processNonPersistent the processNonPersistent to set
083         */
084        public void setProcessNonPersistent(boolean processNonPersistent) {
085            this.processNonPersistent = processNonPersistent;
086        }
087    
088        public boolean isEnableAudit() {
089            return enableAudit;
090        }
091    
092        public void setEnableAudit(boolean enableAudit) {
093            this.enableAudit = enableAudit;
094        }
095    }