001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017     package org.apache.activemq.plugin;
018    
019    import java.util.regex.Pattern;
020    import org.apache.activemq.broker.Broker;
021    import org.apache.activemq.broker.BrokerFilter;
022    import org.apache.activemq.broker.ConnectionContext;
023    import org.apache.activemq.broker.region.MessageReference;
024    import org.apache.activemq.broker.region.Subscription;
025    import org.apache.activemq.command.ActiveMQDestination;
026    import org.apache.activemq.command.Message;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    /**
031     * @author Filip Hanik
032     * @version 1.0
033     */
034    public class DiscardingDLQBroker extends BrokerFilter {
035        public static Logger log = LoggerFactory.getLogger(DiscardingDLQBroker.class);
036        private boolean dropTemporaryTopics = true;
037        private boolean dropTemporaryQueues = true;
038        private boolean dropAll = true;
039        private Pattern[] destFilter;
040        private int reportInterval = 1000;
041        private long dropCount = 0;
042    
043        public DiscardingDLQBroker(Broker next) {
044            super(next);
045        }
046    
047        @Override
048        public void sendToDeadLetterQueue(ConnectionContext ctx, MessageReference msgRef,
049                                          Subscription subscription) {
050            if (log.isTraceEnabled()) {
051                log.trace("Discarding DLQ BrokerFilter[pass through] - skipping message:" + (msgRef != null ? msgRef.getMessage() : null));
052            }
053            boolean dropped = true;
054            Message msg = null;
055            ActiveMQDestination dest = null;
056            String destName = null;
057            msg = msgRef.getMessage();
058            dest = msg.getDestination();
059            destName = dest.getPhysicalName();
060    
061            if (dest == null || destName == null ) {
062                //do nothing, no need to forward it
063                skipMessage("NULL DESTINATION",msgRef);
064            } else if (dropAll) {
065                //do nothing
066                skipMessage("dropAll",msgRef);
067            } else if (dropTemporaryTopics && dest.isTemporary() && dest.isTopic()) {
068                //do nothing
069                skipMessage("dropTemporaryTopics",msgRef);
070            } else if (dropTemporaryQueues && dest.isTemporary() && dest.isQueue()) {
071                //do nothing
072                skipMessage("dropTemporaryQueues",msgRef);
073            } else if (destFilter!=null && matches(destName)) {
074                //do nothing
075                skipMessage("dropOnly",msgRef);
076            } else {
077                dropped = false;
078                next.sendToDeadLetterQueue(ctx, msgRef, subscription);
079            }
080            if (dropped && getReportInterval()>0) {
081                if ((++dropCount)%getReportInterval() == 0 ) {
082                    log.info("Total of "+dropCount+" messages were discarded, since their destination was the dead letter queue");
083                }
084            }
085        }
086    
087        public boolean matches(String destName) {
088            for (int i=0; destFilter!=null && i<destFilter.length; i++) {
089                if (destFilter[i]!=null && destFilter[i].matcher(destName).matches()) {
090                    return true;
091                }
092            }
093            return false;
094        }
095    
096        private void skipMessage(String prefix, MessageReference msgRef) {
097            if (log.isDebugEnabled()) {
098                String lmsg = "Discarding DLQ BrokerFilter["+prefix+"] - skipping message:" + (msgRef!=null?msgRef.getMessage():null);
099                log.debug(lmsg);
100            }
101        }
102    
103        public void setDropTemporaryTopics(boolean dropTemporaryTopics) {
104            this.dropTemporaryTopics = dropTemporaryTopics;
105        }
106    
107        public void setDropTemporaryQueues(boolean dropTemporaryQueues) {
108            this.dropTemporaryQueues = dropTemporaryQueues;
109        }
110    
111        public void setDropAll(boolean dropAll) {
112            this.dropAll = dropAll;
113        }
114    
115        public void setDestFilter(Pattern[] destFilter) {
116            this.destFilter = destFilter;
117        }
118    
119        public void setReportInterval(int reportInterval) {
120            this.reportInterval = reportInterval;
121        }
122    
123        public boolean isDropTemporaryTopics() {
124            return dropTemporaryTopics;
125        }
126    
127        public boolean isDropTemporaryQueues() {
128            return dropTemporaryQueues;
129        }
130    
131        public boolean isDropAll() {
132            return dropAll;
133        }
134    
135        public Pattern[] getDestFilter() {
136            return destFilter;
137        }
138    
139        public int getReportInterval() {
140            return reportInterval;
141        }
142    
143    }