001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.region.policy;
018
019 import org.apache.activemq.broker.region.DurableTopicSubscription;
020 import org.apache.activemq.broker.region.Subscription;
021 import org.apache.activemq.command.ActiveMQDestination;
022 import org.apache.activemq.command.ActiveMQQueue;
023 import org.apache.activemq.command.ActiveMQTopic;
024 import org.apache.activemq.command.Message;
025
026 /**
027 * A {@link DeadLetterStrategy} where each destination has its own individual
028 * DLQ using the subject naming hierarchy.
029 *
030 * @org.apache.xbean.XBean
031 *
032 */
033 public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
034
035 private String topicPrefix = "ActiveMQ.DLQ.Topic.";
036 private String queuePrefix = "ActiveMQ.DLQ.Queue.";
037 private String topicSuffix;
038 private String queueSuffix;
039 private boolean useQueueForQueueMessages = true;
040 private boolean useQueueForTopicMessages = true;
041 private boolean destinationPerDurableSubscriber;
042
043 public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) {
044 if (message.getDestination().isQueue()) {
045 return createDestination(message, queuePrefix, queueSuffix, useQueueForQueueMessages, subscription);
046 } else {
047 return createDestination(message, topicPrefix, topicSuffix, useQueueForTopicMessages, subscription);
048 }
049 }
050
051 // Properties
052 // -------------------------------------------------------------------------
053
054 public String getQueuePrefix() {
055 return queuePrefix;
056 }
057
058 /**
059 * Sets the prefix to use for all dead letter queues for queue messages
060 */
061 public void setQueuePrefix(String queuePrefix) {
062 this.queuePrefix = queuePrefix;
063 }
064
065 public String getTopicPrefix() {
066 return topicPrefix;
067 }
068
069 /**
070 * Sets the prefix to use for all dead letter queues for topic messages
071 */
072 public void setTopicPrefix(String topicPrefix) {
073 this.topicPrefix = topicPrefix;
074 }
075
076 public String getQueueSuffix() {
077 return queueSuffix;
078 }
079
080 /**
081 * Sets the suffix to use for all dead letter queues for queue messages
082 */
083 public void setQueueSuffix(String queueSuffix) {
084 this.queueSuffix = queueSuffix;
085 }
086
087 public String getTopicSuffix() {
088 return topicSuffix;
089 }
090
091 /**
092 * Sets the suffix to use for all dead letter queues for topic messages
093 */
094 public void setTopicSuffix(String topicSuffix) {
095 this.topicSuffix = topicSuffix;
096 }
097
098 public boolean isUseQueueForQueueMessages() {
099 return useQueueForQueueMessages;
100 }
101
102 /**
103 * Sets whether a queue or topic should be used for queue messages sent to a
104 * DLQ. The default is to use a Queue
105 */
106 public void setUseQueueForQueueMessages(boolean useQueueForQueueMessages) {
107 this.useQueueForQueueMessages = useQueueForQueueMessages;
108 }
109
110 public boolean isUseQueueForTopicMessages() {
111 return useQueueForTopicMessages;
112 }
113
114 /**
115 * Sets whether a queue or topic should be used for topic messages sent to a
116 * DLQ. The default is to use a Queue
117 */
118 public void setUseQueueForTopicMessages(boolean useQueueForTopicMessages) {
119 this.useQueueForTopicMessages = useQueueForTopicMessages;
120 }
121
122 public boolean isDestinationPerDurableSubscriber() {
123 return destinationPerDurableSubscriber;
124 }
125
126 /**
127 * sets whether durable topic subscriptions are to get individual dead letter destinations.
128 * When true, the DLQ is of the form 'topicPrefix.clientId:subscriptionName'
129 * The default is false.
130 * @param destinationPerDurableSubscriber
131 */
132 public void setDestinationPerDurableSubscriber(boolean destinationPerDurableSubscriber) {
133 this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
134 }
135
136 // Implementation methods
137 // -------------------------------------------------------------------------
138 protected ActiveMQDestination createDestination(Message message,
139 String prefix,
140 String suffix,
141 boolean useQueue,
142 Subscription subscription ) {
143 String name = null;
144
145 if (message.getRegionDestination() != null
146 && message.getRegionDestination().getActiveMQDestination() != null
147 && message.getRegionDestination().getActiveMQDestination().getPhysicalName() != null
148 && !message.getRegionDestination().getActiveMQDestination().getPhysicalName().isEmpty()){
149 name = prefix + message.getRegionDestination().getActiveMQDestination().getPhysicalName();
150 } else {
151 name = prefix + message.getDestination().getPhysicalName();
152 }
153
154 if (destinationPerDurableSubscriber && subscription instanceof DurableTopicSubscription) {
155 name += "." + ((DurableTopicSubscription)subscription).getSubscriptionKey();
156 }
157
158 if (suffix != null && !suffix.isEmpty()) {
159 name += suffix;
160 }
161
162 if (useQueue) {
163 return new ActiveMQQueue(name);
164 } else {
165 return new ActiveMQTopic(name);
166 }
167 }
168 }