001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.advisory;
018    
019    import java.util.ArrayList;
020    
021    import javax.jms.Destination;
022    import javax.jms.JMSException;
023    import org.apache.activemq.ActiveMQMessageTransformation;
024    import org.apache.activemq.command.ActiveMQDestination;
025    import org.apache.activemq.command.ActiveMQTopic;
026    
027    public final class AdvisorySupport {
028        public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
029        public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX
030                + "Connection");
031        public static final ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue");
032        public static final ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic");
033        public static final ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue");
034        public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic");
035        public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
036        public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
037        public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
038        public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
039        public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
040        public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
041        public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
042        public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
043        public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
044        public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
045        public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
046        public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastProducer.";
047        public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
048        public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
049        public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
050        public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
051        public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
052        public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
053        public static final String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge";
054        public static final String NETWORK_BRIDGE_FORWARD_FAILURE_TOPIC_PREFIX = NETWORK_BRIDGE_TOPIC_PREFIX + ".ForwardFailure";
055        public static final String AGENT_TOPIC = "ActiveMQ.Agent";
056        public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
057        public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId";
058        public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME = "originBrokerName";
059        public static final String MSG_PROPERTY_ORIGIN_BROKER_URL = "originBrokerURL";
060        public static final String MSG_PROPERTY_USAGE_NAME = "usageName";
061        public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId";
062        public static final String MSG_PROPERTY_PRODUCER_ID = "producerId";
063        public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
064        public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
065        public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";
066    
067        public static final ActiveMQTopic ALL_DESTINATIONS_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
068                TOPIC_ADVISORY_TOPIC.getPhysicalName() + "," + QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," +
069                        TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
070        public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
071                TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
072        private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
073    
074        private AdvisorySupport() {
075        }
076    
077        public static ActiveMQTopic getConnectionAdvisoryTopic() {
078            return CONNECTION_ADVISORY_TOPIC;
079        }
080    
081        public static ActiveMQTopic[] getAllDestinationAdvisoryTopics(Destination destination) throws JMSException {
082            return getAllDestinationAdvisoryTopics(ActiveMQMessageTransformation.transformDestination(destination));
083        }
084    
085        public static ActiveMQTopic[] getAllDestinationAdvisoryTopics(ActiveMQDestination destination) throws JMSException {
086            ArrayList<ActiveMQTopic> result = new ArrayList<ActiveMQTopic>();
087    
088            result.add(getConsumerAdvisoryTopic(destination));
089            result.add(getProducerAdvisoryTopic(destination));
090            result.add(getExpiredMessageTopic(destination));
091            result.add(getNoConsumersAdvisoryTopic(destination));
092            result.add(getSlowConsumerAdvisoryTopic(destination));
093            result.add(getFastProducerAdvisoryTopic(destination));
094            result.add(getMessageDiscardedAdvisoryTopic(destination));
095            result.add(getMessageDeliveredAdvisoryTopic(destination));
096            result.add(getMessageConsumedAdvisoryTopic(destination));
097            result.add(getMessageDLQdAdvisoryTopic(destination));
098            result.add(getFullAdvisoryTopic(destination));
099    
100            return result.toArray(new ActiveMQTopic[0]);
101        }
102    
103        public static ActiveMQTopic getConsumerAdvisoryTopic(Destination destination) throws JMSException {
104            return getConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
105        }
106    
107        public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) {
108            if (destination.isQueue()) {
109                return new ActiveMQTopic(QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
110            } else {
111                return new ActiveMQTopic(TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
112            }
113        }
114    
115        public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException {
116            return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
117        }
118    
119        public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
120            if (destination.isQueue()) {
121                return new ActiveMQTopic(QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
122            } else {
123                return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
124            }
125        }
126    
127        public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException {
128            return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination));
129        }
130    
131        public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) {
132            if (destination.isQueue()) {
133                return getExpiredQueueMessageAdvisoryTopic(destination);
134            }
135            return getExpiredTopicMessageAdvisoryTopic(destination);
136        }
137    
138        public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
139            String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
140            return new ActiveMQTopic(name);
141        }
142    
143        public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(Destination destination) throws JMSException {
144            return getExpiredQueueMessageAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
145        }
146    
147        public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) {
148            String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
149            return new ActiveMQTopic(name);
150        }
151    
152        public static ActiveMQTopic getNoConsumersAdvisoryTopic(Destination destination) throws JMSException {
153            return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination));
154        }
155    
156        public static ActiveMQTopic getNoConsumersAdvisoryTopic(ActiveMQDestination destination) {
157            if (destination.isQueue()) {
158                return getNoQueueConsumersAdvisoryTopic(destination);
159            }
160            return getNoTopicConsumersAdvisoryTopic(destination);
161        }
162    
163        public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(Destination destination) throws JMSException {
164            return getNoTopicConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
165        }
166    
167        public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) {
168            String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
169            return new ActiveMQTopic(name);
170        }
171    
172        public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(Destination destination) throws JMSException {
173            return getNoQueueConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
174        }
175    
176        public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) {
177            String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
178            return new ActiveMQTopic(name);
179        }
180    
181        public static ActiveMQTopic getSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
182            return getSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
183        }
184    
185        public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
186            String name = SLOW_CONSUMER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
187                    + destination.getPhysicalName();
188            return new ActiveMQTopic(name);
189        }
190    
191        public static ActiveMQTopic getFastProducerAdvisoryTopic(Destination destination) throws JMSException {
192            return getFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
193        }
194    
195        public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) {
196            String name = FAST_PRODUCER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
197                    + destination.getPhysicalName();
198            return new ActiveMQTopic(name);
199        }
200    
201        public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException {
202            return getMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
203        }
204    
205        public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
206            String name = MESSAGE_DISCAREDED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
207                    + destination.getPhysicalName();
208            return new ActiveMQTopic(name);
209        }
210    
211        public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException {
212            return getMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
213        }
214    
215        public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
216            String name = MESSAGE_DELIVERED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
217                    + destination.getPhysicalName();
218            return new ActiveMQTopic(name);
219        }
220    
221        public static ActiveMQTopic getMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
222            return getMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
223        }
224    
225        public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
226            String name = MESSAGE_CONSUMED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
227                    + destination.getPhysicalName();
228            return new ActiveMQTopic(name);
229        }
230    
231        public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) {
232            String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
233                    + destination.getPhysicalName();
234            return new ActiveMQTopic(name);
235        }
236    
237        public static ActiveMQTopic getMasterBrokerAdvisoryTopic() {
238            return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
239        }
240    
241        public static ActiveMQTopic getNetworkBridgeAdvisoryTopic() {
242            return new ActiveMQTopic(NETWORK_BRIDGE_TOPIC_PREFIX);
243        }
244    
245        public static ActiveMQTopic getFullAdvisoryTopic(Destination destination) throws JMSException {
246            return getFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
247        }
248    
249        public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
250            String name = FULL_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
251                    + destination.getPhysicalName();
252            return new ActiveMQTopic(name);
253        }
254    
255        public static ActiveMQTopic getDestinationAdvisoryTopic(Destination destination) throws JMSException {
256            return getDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
257        }
258    
259        public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
260            switch (destination.getDestinationType()) {
261                case ActiveMQDestination.QUEUE_TYPE:
262                    return QUEUE_ADVISORY_TOPIC;
263                case ActiveMQDestination.TOPIC_TYPE:
264                    return TOPIC_ADVISORY_TOPIC;
265                case ActiveMQDestination.TEMP_QUEUE_TYPE:
266                    return TEMP_QUEUE_ADVISORY_TOPIC;
267                case ActiveMQDestination.TEMP_TOPIC_TYPE:
268                    return TEMP_TOPIC_ADVISORY_TOPIC;
269                default:
270                    throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
271            }
272        }
273    
274        public static boolean isDestinationAdvisoryTopic(Destination destination) throws JMSException {
275            return isDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
276        }
277    
278        public static boolean isTempDestinationAdvisoryTopic(ActiveMQDestination destination) {
279            if (destination.isComposite()) {
280                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
281                for (int i = 0; i < compositeDestinations.length; i++) {
282                    if (!isTempDestinationAdvisoryTopic(compositeDestinations[i])) {
283                        return false;
284                    }
285                }
286                return true;
287            } else {
288                return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC);
289            }
290        }
291    
292        public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {
293            if (destination.isComposite()) {
294                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
295                for (int i = 0; i < compositeDestinations.length; i++) {
296                    if (isDestinationAdvisoryTopic(compositeDestinations[i])) {
297                        return true;
298                    }
299                }
300                return false;
301            } else {
302                return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC)
303                        || destination.equals(QUEUE_ADVISORY_TOPIC) || destination.equals(TOPIC_ADVISORY_TOPIC);
304            }
305        }
306    
307        public static boolean isAdvisoryTopic(Destination destination) throws JMSException {
308            return isAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
309        }
310    
311        public static boolean isAdvisoryTopic(ActiveMQDestination destination) {
312            if (destination != null) {
313                if (destination.isComposite()) {
314                    ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
315                    for (int i = 0; i < compositeDestinations.length; i++) {
316                        if (isAdvisoryTopic(compositeDestinations[i])) {
317                            return true;
318                        }
319                    }
320                    return false;
321                } else {
322                    return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
323                }
324            }
325            return false;
326        }
327    
328        public static boolean isConnectionAdvisoryTopic(Destination destination) throws JMSException {
329            return isConnectionAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
330        }
331    
332        public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) {
333            if (destination.isComposite()) {
334                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
335                for (int i = 0; i < compositeDestinations.length; i++) {
336                    if (isConnectionAdvisoryTopic(compositeDestinations[i])) {
337                        return true;
338                    }
339                }
340                return false;
341            } else {
342                return destination.equals(CONNECTION_ADVISORY_TOPIC);
343            }
344        }
345    
346        public static boolean isProducerAdvisoryTopic(Destination destination) throws JMSException {
347            return isProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
348        }
349    
350        public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) {
351            if (destination.isComposite()) {
352                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
353                for (int i = 0; i < compositeDestinations.length; i++) {
354                    if (isProducerAdvisoryTopic(compositeDestinations[i])) {
355                        return true;
356                    }
357                }
358                return false;
359            } else {
360                return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX);
361            }
362        }
363    
364        public static boolean isConsumerAdvisoryTopic(Destination destination) throws JMSException {
365            return isConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
366        }
367    
368        public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) {
369            if (destination.isComposite()) {
370                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
371                for (int i = 0; i < compositeDestinations.length; i++) {
372                    if (isConsumerAdvisoryTopic(compositeDestinations[i])) {
373                        return true;
374                    }
375                }
376                return false;
377            } else {
378                return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
379            }
380        }
381    
382        public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
383            return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
384        }
385    
386        public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
387            if (destination.isComposite()) {
388                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
389                for (int i = 0; i < compositeDestinations.length; i++) {
390                    if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) {
391                        return true;
392                    }
393                }
394                return false;
395            } else {
396                return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX);
397            }
398        }
399    
400        public static boolean isFastProducerAdvisoryTopic(Destination destination) throws JMSException {
401            return isFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
402        }
403    
404        public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) {
405            if (destination.isComposite()) {
406                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
407                for (int i = 0; i < compositeDestinations.length; i++) {
408                    if (isFastProducerAdvisoryTopic(compositeDestinations[i])) {
409                        return true;
410                    }
411                }
412                return false;
413            } else {
414                return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX);
415            }
416        }
417    
418        public static boolean isMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
419            return isMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
420        }
421    
422        public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
423            if (destination.isComposite()) {
424                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
425                for (int i = 0; i < compositeDestinations.length; i++) {
426                    if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) {
427                        return true;
428                    }
429                }
430                return false;
431            } else {
432                return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
433            }
434        }
435    
436        public static boolean isMasterBrokerAdvisoryTopic(Destination destination) throws JMSException {
437            return isMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
438        }
439    
440        public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) {
441            if (destination.isComposite()) {
442                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
443                for (int i = 0; i < compositeDestinations.length; i++) {
444                    if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
445                        return true;
446                    }
447                }
448                return false;
449            } else {
450                return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
451            }
452        }
453    
454        public static boolean isMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException {
455            return isMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
456        }
457    
458        public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
459            if (destination.isComposite()) {
460                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
461                for (int i = 0; i < compositeDestinations.length; i++) {
462                    if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) {
463                        return true;
464                    }
465                }
466                return false;
467            } else {
468                return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
469            }
470        }
471    
472        public static boolean isMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException {
473            return isMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
474        }
475    
476        public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
477            if (destination.isComposite()) {
478                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
479                for (int i = 0; i < compositeDestinations.length; i++) {
480                    if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) {
481                        return true;
482                    }
483                }
484                return false;
485            } else {
486                return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
487            }
488        }
489    
490        public static boolean isFullAdvisoryTopic(Destination destination) throws JMSException {
491            return isFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
492        }
493    
494        public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) {
495            if (destination.isComposite()) {
496                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
497                for (int i = 0; i < compositeDestinations.length; i++) {
498                    if (isFullAdvisoryTopic(compositeDestinations[i])) {
499                        return true;
500                    }
501                }
502                return false;
503            } else {
504                return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX);
505            }
506        }
507    
508        public static boolean isNetworkBridgeAdvisoryTopic(Destination destination) throws JMSException {
509            return isNetworkBridgeAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
510        }
511    
512        public static boolean isNetworkBridgeAdvisoryTopic(ActiveMQDestination destination) {
513            if (destination.isComposite()) {
514                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
515                for (int i = 0; i < compositeDestinations.length; i++) {
516                    if (isNetworkBridgeAdvisoryTopic(compositeDestinations[i])) {
517                        return true;
518                    }
519                }
520                return false;
521            } else {
522                return destination.isTopic() && destination.getPhysicalName().startsWith(NETWORK_BRIDGE_TOPIC_PREFIX);
523            }
524        }
525    
526        /**
527         * Returns the agent topic which is used to send commands to the broker
528         */
529        public static Destination getAgentDestination() {
530            return AGENT_TOPIC_DESTINATION;
531        }
532    
533        public static ActiveMQTopic getNetworkBridgeForwardFailureAdvisoryTopic() {
534            return new ActiveMQTopic(NETWORK_BRIDGE_FORWARD_FAILURE_TOPIC_PREFIX);
535        }
536    }