001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.advisory;
018    
019    import java.util.ArrayList;
020    
021    import javax.jms.Destination;
022    import javax.jms.JMSException;
023    import org.apache.activemq.ActiveMQMessageTransformation;
024    import org.apache.activemq.command.ActiveMQDestination;
025    import org.apache.activemq.command.ActiveMQTopic;
026    
027    public final class AdvisorySupport {
028        public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
029        public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX
030                + "Connection");
031        public static final ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue");
032        public static final ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic");
033        public static final ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue");
034        public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic");
035        public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
036        public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
037        public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
038        public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
039        public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
040        public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
041        public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
042        public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
043        public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
044        public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
045        public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
046        public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastProducer.";
047        public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
048        public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
049        public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
050        public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
051        public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
052        public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
053        public static final String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge";
054        public static final String AGENT_TOPIC = "ActiveMQ.Agent";
055        public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
056        public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId";
057        public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME = "originBrokerName";
058        public static final String MSG_PROPERTY_ORIGIN_BROKER_URL = "originBrokerURL";
059        public static final String MSG_PROPERTY_USAGE_NAME = "usageName";
060        public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId";
061        public static final String MSG_PROPERTY_PRODUCER_ID = "producerId";
062        public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
063        public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
064        public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";
065    
066        public static final ActiveMQTopic ALL_DESTINATIONS_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
067                TOPIC_ADVISORY_TOPIC.getPhysicalName() + "," + QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," +
068                        TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
069        public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
070                TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
071        private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
072    
073        private AdvisorySupport() {
074        }
075    
076        public static ActiveMQTopic getConnectionAdvisoryTopic() {
077            return CONNECTION_ADVISORY_TOPIC;
078        }
079    
080        public static ActiveMQTopic[] getAllDestinationAdvisoryTopics(Destination destination) throws JMSException {
081            return getAllDestinationAdvisoryTopics(ActiveMQMessageTransformation.transformDestination(destination));
082        }
083    
084        public static ActiveMQTopic[] getAllDestinationAdvisoryTopics(ActiveMQDestination destination) throws JMSException {
085            ArrayList<ActiveMQTopic> result = new ArrayList<ActiveMQTopic>();
086    
087            result.add(getConsumerAdvisoryTopic(destination));
088            result.add(getProducerAdvisoryTopic(destination));
089            result.add(getExpiredMessageTopic(destination));
090            result.add(getNoConsumersAdvisoryTopic(destination));
091            result.add(getSlowConsumerAdvisoryTopic(destination));
092            result.add(getFastProducerAdvisoryTopic(destination));
093            result.add(getMessageDiscardedAdvisoryTopic(destination));
094            result.add(getMessageDeliveredAdvisoryTopic(destination));
095            result.add(getMessageConsumedAdvisoryTopic(destination));
096            result.add(getMessageDLQdAdvisoryTopic(destination));
097            result.add(getFullAdvisoryTopic(destination));
098    
099            return result.toArray(new ActiveMQTopic[0]);
100        }
101    
102        public static ActiveMQTopic getConsumerAdvisoryTopic(Destination destination) throws JMSException {
103            return getConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
104        }
105    
106        public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) {
107            if (destination.isQueue()) {
108                return new ActiveMQTopic(QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
109            } else {
110                return new ActiveMQTopic(TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
111            }
112        }
113    
114        public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException {
115            return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
116        }
117    
118        public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
119            if (destination.isQueue()) {
120                return new ActiveMQTopic(QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
121            } else {
122                return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX + destination.getPhysicalName());
123            }
124        }
125    
126        public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException {
127            return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination));
128        }
129    
130        public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) {
131            if (destination.isQueue()) {
132                return getExpiredQueueMessageAdvisoryTopic(destination);
133            }
134            return getExpiredTopicMessageAdvisoryTopic(destination);
135        }
136    
137        public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
138            String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
139            return new ActiveMQTopic(name);
140        }
141    
142        public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(Destination destination) throws JMSException {
143            return getExpiredQueueMessageAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
144        }
145    
146        public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) {
147            String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
148            return new ActiveMQTopic(name);
149        }
150    
151        public static ActiveMQTopic getNoConsumersAdvisoryTopic(Destination destination) throws JMSException {
152            return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination));
153        }
154    
155        public static ActiveMQTopic getNoConsumersAdvisoryTopic(ActiveMQDestination destination) {
156            if (destination.isQueue()) {
157                return getNoQueueConsumersAdvisoryTopic(destination);
158            }
159            return getNoTopicConsumersAdvisoryTopic(destination);
160        }
161    
162        public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(Destination destination) throws JMSException {
163            return getNoTopicConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
164        }
165    
166        public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) {
167            String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
168            return new ActiveMQTopic(name);
169        }
170    
171        public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(Destination destination) throws JMSException {
172            return getNoQueueConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
173        }
174    
175        public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) {
176            String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
177            return new ActiveMQTopic(name);
178        }
179    
180        public static ActiveMQTopic getSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
181            return getSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
182        }
183    
184        public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
185            String name = SLOW_CONSUMER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
186                    + destination.getPhysicalName();
187            return new ActiveMQTopic(name);
188        }
189    
190        public static ActiveMQTopic getFastProducerAdvisoryTopic(Destination destination) throws JMSException {
191            return getFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
192        }
193    
194        public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) {
195            String name = FAST_PRODUCER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
196                    + destination.getPhysicalName();
197            return new ActiveMQTopic(name);
198        }
199    
200        public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException {
201            return getMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
202        }
203    
204        public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
205            String name = MESSAGE_DISCAREDED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
206                    + destination.getPhysicalName();
207            return new ActiveMQTopic(name);
208        }
209    
210        public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException {
211            return getMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
212        }
213    
214        public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
215            String name = MESSAGE_DELIVERED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
216                    + destination.getPhysicalName();
217            return new ActiveMQTopic(name);
218        }
219    
220        public static ActiveMQTopic getMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
221            return getMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
222        }
223    
224        public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
225            String name = MESSAGE_CONSUMED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
226                    + destination.getPhysicalName();
227            return new ActiveMQTopic(name);
228        }
229    
230        public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) {
231            String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
232                    + destination.getPhysicalName();
233            return new ActiveMQTopic(name);
234        }
235    
236        public static ActiveMQTopic getMasterBrokerAdvisoryTopic() {
237            return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
238        }
239    
240        public static ActiveMQTopic getNetworkBridgeAdvisoryTopic() {
241            return new ActiveMQTopic(NETWORK_BRIDGE_TOPIC_PREFIX);
242        }
243    
244        public static ActiveMQTopic getFullAdvisoryTopic(Destination destination) throws JMSException {
245            return getFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
246        }
247    
248        public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
249            String name = FULL_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
250                    + destination.getPhysicalName();
251            return new ActiveMQTopic(name);
252        }
253    
254        public static ActiveMQTopic getDestinationAdvisoryTopic(Destination destination) throws JMSException {
255            return getDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
256        }
257    
258        public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
259            switch (destination.getDestinationType()) {
260                case ActiveMQDestination.QUEUE_TYPE:
261                    return QUEUE_ADVISORY_TOPIC;
262                case ActiveMQDestination.TOPIC_TYPE:
263                    return TOPIC_ADVISORY_TOPIC;
264                case ActiveMQDestination.TEMP_QUEUE_TYPE:
265                    return TEMP_QUEUE_ADVISORY_TOPIC;
266                case ActiveMQDestination.TEMP_TOPIC_TYPE:
267                    return TEMP_TOPIC_ADVISORY_TOPIC;
268                default:
269                    throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
270            }
271        }
272    
273        public static boolean isDestinationAdvisoryTopic(Destination destination) throws JMSException {
274            return isDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
275        }
276    
277        public static boolean isTempDestinationAdvisoryTopic(ActiveMQDestination destination) {
278            if (destination.isComposite()) {
279                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
280                for (int i = 0; i < compositeDestinations.length; i++) {
281                    if (!isTempDestinationAdvisoryTopic(compositeDestinations[i])) {
282                        return false;
283                    }
284                }
285                return true;
286            } else {
287                return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC);
288            }
289        }
290    
291        public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {
292            if (destination.isComposite()) {
293                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
294                for (int i = 0; i < compositeDestinations.length; i++) {
295                    if (isDestinationAdvisoryTopic(compositeDestinations[i])) {
296                        return true;
297                    }
298                }
299                return false;
300            } else {
301                return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC)
302                        || destination.equals(QUEUE_ADVISORY_TOPIC) || destination.equals(TOPIC_ADVISORY_TOPIC);
303            }
304        }
305    
306        public static boolean isAdvisoryTopic(Destination destination) throws JMSException {
307            return isAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
308        }
309    
310        public static boolean isAdvisoryTopic(ActiveMQDestination destination) {
311            if (destination != null) {
312                if (destination.isComposite()) {
313                    ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
314                    for (int i = 0; i < compositeDestinations.length; i++) {
315                        if (isAdvisoryTopic(compositeDestinations[i])) {
316                            return true;
317                        }
318                    }
319                    return false;
320                } else {
321                    return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
322                }
323            }
324            return false;
325        }
326    
327        public static boolean isConnectionAdvisoryTopic(Destination destination) throws JMSException {
328            return isConnectionAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
329        }
330    
331        public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) {
332            if (destination.isComposite()) {
333                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
334                for (int i = 0; i < compositeDestinations.length; i++) {
335                    if (isConnectionAdvisoryTopic(compositeDestinations[i])) {
336                        return true;
337                    }
338                }
339                return false;
340            } else {
341                return destination.equals(CONNECTION_ADVISORY_TOPIC);
342            }
343        }
344    
345        public static boolean isProducerAdvisoryTopic(Destination destination) throws JMSException {
346            return isProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
347        }
348    
349        public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) {
350            if (destination.isComposite()) {
351                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
352                for (int i = 0; i < compositeDestinations.length; i++) {
353                    if (isProducerAdvisoryTopic(compositeDestinations[i])) {
354                        return true;
355                    }
356                }
357                return false;
358            } else {
359                return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX);
360            }
361        }
362    
363        public static boolean isConsumerAdvisoryTopic(Destination destination) throws JMSException {
364            return isConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
365        }
366    
367        public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) {
368            if (destination.isComposite()) {
369                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
370                for (int i = 0; i < compositeDestinations.length; i++) {
371                    if (isConsumerAdvisoryTopic(compositeDestinations[i])) {
372                        return true;
373                    }
374                }
375                return false;
376            } else {
377                return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
378            }
379        }
380    
381        public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
382            return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
383        }
384    
385        public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
386            if (destination.isComposite()) {
387                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
388                for (int i = 0; i < compositeDestinations.length; i++) {
389                    if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) {
390                        return true;
391                    }
392                }
393                return false;
394            } else {
395                return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX);
396            }
397        }
398    
399        public static boolean isFastProducerAdvisoryTopic(Destination destination) throws JMSException {
400            return isFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
401        }
402    
403        public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) {
404            if (destination.isComposite()) {
405                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
406                for (int i = 0; i < compositeDestinations.length; i++) {
407                    if (isFastProducerAdvisoryTopic(compositeDestinations[i])) {
408                        return true;
409                    }
410                }
411                return false;
412            } else {
413                return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX);
414            }
415        }
416    
417        public static boolean isMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
418            return isMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
419        }
420    
421        public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
422            if (destination.isComposite()) {
423                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
424                for (int i = 0; i < compositeDestinations.length; i++) {
425                    if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) {
426                        return true;
427                    }
428                }
429                return false;
430            } else {
431                return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
432            }
433        }
434    
435        public static boolean isMasterBrokerAdvisoryTopic(Destination destination) throws JMSException {
436            return isMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
437        }
438    
439        public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) {
440            if (destination.isComposite()) {
441                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
442                for (int i = 0; i < compositeDestinations.length; i++) {
443                    if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
444                        return true;
445                    }
446                }
447                return false;
448            } else {
449                return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
450            }
451        }
452    
453        public static boolean isMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException {
454            return isMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
455        }
456    
457        public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
458            if (destination.isComposite()) {
459                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
460                for (int i = 0; i < compositeDestinations.length; i++) {
461                    if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) {
462                        return true;
463                    }
464                }
465                return false;
466            } else {
467                return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
468            }
469        }
470    
471        public static boolean isMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException {
472            return isMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
473        }
474    
475        public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
476            if (destination.isComposite()) {
477                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
478                for (int i = 0; i < compositeDestinations.length; i++) {
479                    if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) {
480                        return true;
481                    }
482                }
483                return false;
484            } else {
485                return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
486            }
487        }
488    
489        public static boolean isFullAdvisoryTopic(Destination destination) throws JMSException {
490            return isFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
491        }
492    
493        public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) {
494            if (destination.isComposite()) {
495                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
496                for (int i = 0; i < compositeDestinations.length; i++) {
497                    if (isFullAdvisoryTopic(compositeDestinations[i])) {
498                        return true;
499                    }
500                }
501                return false;
502            } else {
503                return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX);
504            }
505        }
506    
507        public static boolean isNetworkBridgeAdvisoryTopic(Destination destination) throws JMSException {
508            return isNetworkBridgeAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
509        }
510    
511        public static boolean isNetworkBridgeAdvisoryTopic(ActiveMQDestination destination) {
512            if (destination.isComposite()) {
513                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
514                for (int i = 0; i < compositeDestinations.length; i++) {
515                    if (isNetworkBridgeAdvisoryTopic(compositeDestinations[i])) {
516                        return true;
517                    }
518                }
519                return false;
520            } else {
521                return destination.isTopic() && destination.getPhysicalName().startsWith(NETWORK_BRIDGE_TOPIC_PREFIX);
522            }
523        }
524    
525        /**
526         * Returns the agent topic which is used to send commands to the broker
527         */
528        public static Destination getAgentDestination() {
529            return AGENT_TOPIC_DESTINATION;
530        }
531    }