001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.policy;
018    
019    import org.apache.activemq.broker.region.DurableTopicSubscription;
020    import org.apache.activemq.broker.region.Subscription;
021    import org.apache.activemq.command.ActiveMQDestination;
022    import org.apache.activemq.command.ActiveMQQueue;
023    import org.apache.activemq.command.ActiveMQTopic;
024    import org.apache.activemq.command.Message;
025    
026    /**
027     * A {@link DeadLetterStrategy} where each destination has its own individual
028     * DLQ using the subject naming hierarchy.
029     *
030     * @org.apache.xbean.XBean
031     *
032     */
033    public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
034    
035        private String topicPrefix = "ActiveMQ.DLQ.Topic.";
036        private String queuePrefix = "ActiveMQ.DLQ.Queue.";
037        private String topicSuffix;
038        private String queueSuffix;
039        private boolean useQueueForQueueMessages = true;
040        private boolean useQueueForTopicMessages = true;
041        private boolean destinationPerDurableSubscriber;
042    
043        public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) {
044            if (message.getDestination().isQueue()) {
045                return createDestination(message, queuePrefix, queueSuffix, useQueueForQueueMessages, subscription);
046            } else {
047                return createDestination(message, topicPrefix, topicSuffix, useQueueForTopicMessages, subscription);
048            }
049        }
050    
051        // Properties
052        // -------------------------------------------------------------------------
053    
054        public String getQueuePrefix() {
055            return queuePrefix;
056        }
057    
058        /**
059         * Sets the prefix to use for all dead letter queues for queue messages
060         */
061        public void setQueuePrefix(String queuePrefix) {
062            this.queuePrefix = queuePrefix;
063        }
064    
065        public String getTopicPrefix() {
066            return topicPrefix;
067        }
068    
069        /**
070         * Sets the prefix to use for all dead letter queues for topic messages
071         */
072        public void setTopicPrefix(String topicPrefix) {
073            this.topicPrefix = topicPrefix;
074        }
075    
076        public String getQueueSuffix() {
077            return queueSuffix;
078        }
079    
080        /**
081         * Sets the suffix to use for all dead letter queues for queue messages
082         */
083        public void setQueueSuffix(String queueSuffix) {
084            this.queueSuffix = queueSuffix;
085        }
086    
087        public String getTopicSuffix() {
088            return topicSuffix;
089        }
090    
091        /**
092         * Sets the suffix to use for all dead letter queues for topic messages
093         */
094        public void setTopicSuffix(String topicSuffix) {
095            this.topicSuffix = topicSuffix;
096        }
097    
098        public boolean isUseQueueForQueueMessages() {
099            return useQueueForQueueMessages;
100        }
101    
102        /**
103         * Sets whether a queue or topic should be used for queue messages sent to a
104         * DLQ. The default is to use a Queue
105         */
106        public void setUseQueueForQueueMessages(boolean useQueueForQueueMessages) {
107            this.useQueueForQueueMessages = useQueueForQueueMessages;
108        }
109    
110        public boolean isUseQueueForTopicMessages() {
111            return useQueueForTopicMessages;
112        }
113    
114        /**
115         * Sets whether a queue or topic should be used for topic messages sent to a
116         * DLQ. The default is to use a Queue
117         */
118        public void setUseQueueForTopicMessages(boolean useQueueForTopicMessages) {
119            this.useQueueForTopicMessages = useQueueForTopicMessages;
120        }
121    
122        public boolean isDestinationPerDurableSubscriber() {
123            return destinationPerDurableSubscriber;
124        }
125    
126        /**
127         * sets whether durable topic subscriptions are to get individual dead letter destinations.
128         * When true, the DLQ is of the form 'topicPrefix.clientId:subscriptionName'
129         * The default is false.
130         * @param destinationPerDurableSubscriber
131         */
132        public void setDestinationPerDurableSubscriber(boolean destinationPerDurableSubscriber) {
133            this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
134        }
135    
136        // Implementation methods
137        // -------------------------------------------------------------------------
138        protected ActiveMQDestination createDestination(Message message,
139                                                        String prefix,
140                                                        String suffix,
141                                                        boolean useQueue,
142                                                        Subscription subscription ) {
143            String name = null;
144    
145            if (message.getRegionDestination() != null
146                    && message.getRegionDestination().getActiveMQDestination() != null
147                    && message.getRegionDestination().getActiveMQDestination().getPhysicalName() != null
148                    && !message.getRegionDestination().getActiveMQDestination().getPhysicalName().isEmpty()){
149                name = prefix + message.getRegionDestination().getActiveMQDestination().getPhysicalName();
150            } else {
151                name = prefix + message.getDestination().getPhysicalName();
152            }
153    
154            if (destinationPerDurableSubscriber && subscription instanceof DurableTopicSubscription) {
155                name += "." + ((DurableTopicSubscription)subscription).getSubscriptionKey();
156            }
157    
158            if (suffix != null && !suffix.isEmpty()) {
159                name += suffix;
160            }
161    
162            if (useQueue) {
163                return new ActiveMQQueue(name);
164            } else {
165                return new ActiveMQTopic(name);
166            }
167        }
168    }