001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.policy;
018    
019    import org.apache.activemq.broker.region.Destination;
020    import org.apache.activemq.broker.region.DurableTopicSubscription;
021    import org.apache.activemq.broker.region.Subscription;
022    import org.apache.activemq.command.ActiveMQDestination;
023    import org.apache.activemq.command.ActiveMQQueue;
024    import org.apache.activemq.command.ActiveMQTopic;
025    import org.apache.activemq.command.Message;
026    
027    /**
028     * A {@link DeadLetterStrategy} where each destination has its own individual
029     * DLQ using the subject naming hierarchy.
030     *
031     * @org.apache.xbean.XBean
032     *
033     */
034    public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
035    
036        private String topicPrefix = "ActiveMQ.DLQ.Topic.";
037        private String queuePrefix = "ActiveMQ.DLQ.Queue.";
038        private String topicSuffix;
039        private String queueSuffix;
040        private boolean useQueueForQueueMessages = true;
041        private boolean useQueueForTopicMessages = true;
042        private boolean destinationPerDurableSubscriber;
043    
044        public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) {
045            if (message.getDestination().isQueue()) {
046                return createDestination(message, queuePrefix, queueSuffix, useQueueForQueueMessages, subscription);
047            } else {
048                return createDestination(message, topicPrefix, topicSuffix, useQueueForTopicMessages, subscription);
049            }
050        }
051    
052        // Properties
053        // -------------------------------------------------------------------------
054    
055        public String getQueuePrefix() {
056            return queuePrefix;
057        }
058    
059        /**
060         * Sets the prefix to use for all dead letter queues for queue messages
061         */
062        public void setQueuePrefix(String queuePrefix) {
063            this.queuePrefix = queuePrefix;
064        }
065    
066        public String getTopicPrefix() {
067            return topicPrefix;
068        }
069    
070        /**
071         * Sets the prefix to use for all dead letter queues for topic messages
072         */
073        public void setTopicPrefix(String topicPrefix) {
074            this.topicPrefix = topicPrefix;
075        }
076    
077        public String getQueueSuffix() {
078            return queueSuffix;
079        }
080    
081        /**
082         * Sets the suffix to use for all dead letter queues for queue messages
083         */
084        public void setQueueSuffix(String queueSuffix) {
085            this.queueSuffix = queueSuffix;
086        }
087    
088        public String getTopicSuffix() {
089            return topicSuffix;
090        }
091    
092        /**
093         * Sets the suffix to use for all dead letter queues for topic messages
094         */
095        public void setTopicSuffix(String topicSuffix) {
096            this.topicSuffix = topicSuffix;
097        }
098    
099        public boolean isUseQueueForQueueMessages() {
100            return useQueueForQueueMessages;
101        }
102    
103        /**
104         * Sets whether a queue or topic should be used for queue messages sent to a
105         * DLQ. The default is to use a Queue
106         */
107        public void setUseQueueForQueueMessages(boolean useQueueForQueueMessages) {
108            this.useQueueForQueueMessages = useQueueForQueueMessages;
109        }
110    
111        public boolean isUseQueueForTopicMessages() {
112            return useQueueForTopicMessages;
113        }
114    
115        /**
116         * Sets whether a queue or topic should be used for topic messages sent to a
117         * DLQ. The default is to use a Queue
118         */
119        public void setUseQueueForTopicMessages(boolean useQueueForTopicMessages) {
120            this.useQueueForTopicMessages = useQueueForTopicMessages;
121        }
122    
123        public boolean isDestinationPerDurableSubscriber() {
124            return destinationPerDurableSubscriber;
125        }
126    
127        /**
128         * sets whether durable topic subscriptions are to get individual dead letter destinations.
129         * When true, the DLQ is of the form 'topicPrefix.clientId:subscriptionName'
130         * The default is false.
131         * @param destinationPerDurableSubscriber
132         */
133        public void setDestinationPerDurableSubscriber(boolean destinationPerDurableSubscriber) {
134            this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
135        }
136    
137        // Implementation methods
138        // -------------------------------------------------------------------------
139        protected ActiveMQDestination createDestination(Message message,
140                                                        String prefix,
141                                                        String suffix,
142                                                        boolean useQueue,
143                                                        Subscription subscription ) {
144            String name = null;
145    
146            Destination regionDestination = (Destination) message.getRegionDestination();
147            if (regionDestination != null
148                    && regionDestination.getActiveMQDestination() != null
149                    && regionDestination.getActiveMQDestination().getPhysicalName() != null
150                    && !regionDestination.getActiveMQDestination().getPhysicalName().isEmpty()){
151                name = prefix + regionDestination.getActiveMQDestination().getPhysicalName();
152            } else {
153                name = prefix + message.getDestination().getPhysicalName();
154            }
155    
156            if (destinationPerDurableSubscriber && subscription instanceof DurableTopicSubscription) {
157                name += "." + ((DurableTopicSubscription)subscription).getSubscriptionKey();
158            }
159    
160            if (suffix != null && !suffix.isEmpty()) {
161                name += suffix;
162            }
163    
164            if (useQueue) {
165                return new ActiveMQQueue(name);
166            } else {
167                return new ActiveMQTopic(name);
168            }
169        }
170    }