001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.advisory;
018    
019    import java.util.Iterator;
020    import java.util.Map;
021    import java.util.Set;
022    import java.util.concurrent.ConcurrentHashMap;
023    
024    import org.apache.activemq.broker.Broker;
025    import org.apache.activemq.broker.BrokerFilter;
026    import org.apache.activemq.broker.ConnectionContext;
027    import org.apache.activemq.broker.ProducerBrokerExchange;
028    import org.apache.activemq.broker.region.Destination;
029    import org.apache.activemq.broker.region.MessageReference;
030    import org.apache.activemq.broker.region.Subscription;
031    import org.apache.activemq.broker.region.TopicSubscription;
032    import org.apache.activemq.command.ActiveMQDestination;
033    import org.apache.activemq.command.ActiveMQMessage;
034    import org.apache.activemq.command.ActiveMQTopic;
035    import org.apache.activemq.command.BrokerInfo;
036    import org.apache.activemq.command.Command;
037    import org.apache.activemq.command.ConnectionId;
038    import org.apache.activemq.command.ConnectionInfo;
039    import org.apache.activemq.command.ConsumerId;
040    import org.apache.activemq.command.ConsumerInfo;
041    import org.apache.activemq.command.DestinationInfo;
042    import org.apache.activemq.command.Message;
043    import org.apache.activemq.command.MessageId;
044    import org.apache.activemq.command.ProducerId;
045    import org.apache.activemq.command.ProducerInfo;
046    import org.apache.activemq.security.SecurityContext;
047    import org.apache.activemq.state.ProducerState;
048    import org.apache.activemq.usage.Usage;
049    import org.apache.activemq.util.IdGenerator;
050    import org.apache.activemq.util.LongSequenceGenerator;
051    import org.slf4j.Logger;
052    import org.slf4j.LoggerFactory;
053    
054    /**
055     * This broker filter handles tracking the state of the broker for purposes of
056     * publishing advisory messages to advisory consumers.
057     */
058    public class AdvisoryBroker extends BrokerFilter {
059    
060        private static final Logger LOG = LoggerFactory.getLogger(AdvisoryBroker.class);
061        private static final IdGenerator ID_GENERATOR = new IdGenerator();
062    
063        protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
064        protected final ConcurrentHashMap<ConsumerId, ConsumerInfo> consumers = new ConcurrentHashMap<ConsumerId, ConsumerInfo>();
065        protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
066        protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
067        protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
068        protected final ProducerId advisoryProducerId = new ProducerId();
069    
070        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
071    
072        public AdvisoryBroker(Broker next) {
073            super(next);
074            advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
075        }
076    
077        @Override
078        public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
079            super.addConnection(context, info);
080    
081            ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
082            //do not distribute usernames or passwords in advisory
083            ConnectionInfo copy = info.copy();
084            copy.setUserName("");
085            copy.setPassword("");
086            fireAdvisory(context, topic, copy);
087            connections.put(copy.getConnectionId(), copy);
088        }
089    
090        @Override
091        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
092            Subscription answer = super.addConsumer(context, info);
093    
094            // Don't advise advisory topics.
095            if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
096                ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
097                consumers.put(info.getConsumerId(), info);
098                fireConsumerAdvisory(context, info.getDestination(), topic, info);
099            } else {
100                // We need to replay all the previously collected state objects
101                // for this newly added consumer.
102                if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) {
103                    // Replay the connections.
104                    for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext();) {
105                        ConnectionInfo value = iter.next();
106                        ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
107                        fireAdvisory(context, topic, value, info.getConsumerId());
108                    }
109                }
110    
111                // We check here whether the Destination is Temporary Destination specific or not since we
112                // can avoid sending advisory messages to the consumer if it only wants Temporary Destination
113                // notifications.  If its not just temporary destination related destinations then we have
114                // to send them all, a composite destination could want both.
115                if (AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) {
116                    // Replay the temporary destinations.
117                    for (DestinationInfo destination : destinations.values()) {
118                        if (destination.getDestination().isTemporary()) {
119                            ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
120                            fireAdvisory(context, topic, destination, info.getConsumerId());
121                        }
122                    }
123                } else if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) {
124                    // Replay all the destinations.
125                    for (DestinationInfo destination : destinations.values()) {
126                        ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
127                        fireAdvisory(context, topic, destination, info.getConsumerId());
128                    }
129                }
130    
131                // Replay the producers.
132                if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) {
133                    for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) {
134                        ProducerInfo value = iter.next();
135                        ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
136                        fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId());
137                    }
138                }
139    
140                // Replay the consumers.
141                if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
142                    for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext();) {
143                        ConsumerInfo value = iter.next();
144                        ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
145                        fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId());
146                    }
147                }
148    
149                // Replay network bridges
150                if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) {
151                    for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext();) {
152                        BrokerInfo key = iter.next();
153                        ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
154                        fireAdvisory(context, topic, key, null, networkBridges.get(key));
155                    }
156                }
157            }
158            return answer;
159        }
160    
161        @Override
162        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
163            super.addProducer(context, info);
164    
165            // Don't advise advisory topics.
166            if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
167                ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
168                fireProducerAdvisory(context, info.getDestination(), topic, info);
169                producers.put(info.getProducerId(), info);
170            }
171        }
172    
173        @Override
174        public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
175            Destination answer = super.addDestination(context, destination,create);
176            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
177                DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
178                DestinationInfo previous = destinations.putIfAbsent(destination, info);
179                if( previous==null ) {
180                    ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
181                    fireAdvisory(context, topic, info);
182                }
183            }
184            return answer;
185        }
186    
187        @Override
188        public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
189            ActiveMQDestination destination = info.getDestination();
190            next.addDestinationInfo(context, info);
191    
192            if (!AdvisorySupport.isAdvisoryTopic(destination)) {
193                DestinationInfo previous = destinations.putIfAbsent(destination, info);
194                if( previous==null ) {
195                    ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
196                    fireAdvisory(context, topic, info);
197                }
198            }
199        }
200    
201        @Override
202        public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
203            super.removeDestination(context, destination, timeout);
204            DestinationInfo info = destinations.remove(destination);
205            if (info != null) {
206                // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
207                info = info.copy();
208                info.setDestination(destination);
209                info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
210                ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
211                fireAdvisory(context, topic, info);
212                ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destination);
213                for(ActiveMQTopic advisoryDestination : advisoryDestinations) {
214                    try {
215                        next.removeDestination(context, advisoryDestination, -1);
216                    } catch (Exception expectedIfDestinationDidNotExistYet) {
217                    }
218                }
219            }
220        }
221    
222        @Override
223        public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
224            super.removeDestinationInfo(context, destInfo);
225            DestinationInfo info = destinations.remove(destInfo.getDestination());
226            if (info != null) {
227                // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
228                info = info.copy();
229                info.setDestination(destInfo.getDestination());
230                info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
231                ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
232                fireAdvisory(context, topic, info);
233                ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destInfo.getDestination());
234                for(ActiveMQTopic advisoryDestination : advisoryDestinations) {
235                    try {
236                        next.removeDestination(context, advisoryDestination, -1);
237                    } catch (Exception expectedIfDestinationDidNotExistYet) {
238                    }
239                }
240            }
241        }
242    
243        @Override
244        public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
245            super.removeConnection(context, info, error);
246    
247            ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
248            fireAdvisory(context, topic, info.createRemoveCommand());
249            connections.remove(info.getConnectionId());
250        }
251    
252        @Override
253        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
254            super.removeConsumer(context, info);
255    
256            // Don't advise advisory topics.
257            ActiveMQDestination dest = info.getDestination();
258            if (!AdvisorySupport.isAdvisoryTopic(dest)) {
259                ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
260                consumers.remove(info.getConsumerId());
261                if (!dest.isTemporary() || destinations.containsKey(dest)) {
262                    fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
263                }
264            }
265        }
266    
267        @Override
268        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
269            super.removeProducer(context, info);
270    
271            // Don't advise advisory topics.
272            ActiveMQDestination dest = info.getDestination();
273            if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) {
274                ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest);
275                producers.remove(info.getProducerId());
276                if (!dest.isTemporary() || destinations.contains(dest)) {
277                    fireProducerAdvisory(context, dest,topic, info.createRemoveCommand());
278                }
279            }
280        }
281    
282        @Override
283        public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
284            super.messageExpired(context, messageReference, subscription);
285            try {
286                if(!messageReference.isAdvisory()) {
287                    ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
288                    Message payload = messageReference.getMessage().copy();
289                    payload.clearBody();
290                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
291                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
292                    fireAdvisory(context, topic, payload, null, advisoryMessage);
293                }
294            } catch (Exception e) {
295                handleFireFailure("expired", e);
296            }
297        }
298    
299        @Override
300        public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
301            super.messageConsumed(context, messageReference);
302            try {
303                if(!messageReference.isAdvisory()) {
304                    ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
305                    Message payload = messageReference.getMessage().copy();
306                    payload.clearBody();
307                    fireAdvisory(context, topic,payload);
308                }
309            } catch (Exception e) {
310                handleFireFailure("consumed", e);
311            }
312        }
313    
314        @Override
315        public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
316            super.messageDelivered(context, messageReference);
317            try {
318                if (!messageReference.isAdvisory()) {
319                    ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
320                    Message payload = messageReference.getMessage().copy();
321                    payload.clearBody();
322                    fireAdvisory(context, topic,payload);
323                }
324            } catch (Exception e) {
325                handleFireFailure("delivered", e);
326            }
327        }
328    
329        @Override
330        public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
331            super.messageDiscarded(context, sub, messageReference);
332            try {
333                if (!messageReference.isAdvisory()) {
334                    ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination());
335                    Message payload = messageReference.getMessage().copy();
336                    payload.clearBody();
337                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
338                    if (sub instanceof TopicSubscription) {
339                        advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription)sub).discarded());
340                    }
341                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString());
342                    fireAdvisory(context, topic, payload, null, advisoryMessage);
343                }
344            } catch (Exception e) {
345                handleFireFailure("discarded", e);
346            }
347        }
348    
349        @Override
350        public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
351            super.slowConsumer(context, destination,subs);
352            try {
353                if (!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
354                    ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
355                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
356                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString());
357                    fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage);
358                }
359            } catch (Exception e) {
360                handleFireFailure("slow consumer", e);
361            }
362        }
363    
364        @Override
365        public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
366            super.fastProducer(context, producerInfo, destination);
367            try {
368                if (!AdvisorySupport.isAdvisoryTopic(destination)) {
369                    ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(destination);
370                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
371                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
372                    fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
373                }
374            } catch (Exception e) {
375                handleFireFailure("fast producer", e);
376            }
377        }
378    
379        @Override
380        public void isFull(ConnectionContext context, Destination destination, Usage usage) {
381            super.isFull(context, destination, usage);
382            if (AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()) == false) {
383                try {
384    
385                    ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination());
386                    ActiveMQMessage advisoryMessage = new ActiveMQMessage();
387                    advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_USAGE_NAME, usage.getName());
388                    fireAdvisory(context, topic, null, null, advisoryMessage);
389    
390                } catch (Exception e) {
391                    handleFireFailure("is full", e);
392                }
393            }
394        }
395    
396        @Override
397        public void nowMasterBroker() {
398            super.nowMasterBroker();
399            try {
400                ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic();
401                ActiveMQMessage advisoryMessage = new ActiveMQMessage();
402                ConnectionContext context = new ConnectionContext();
403                context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
404                context.setBroker(getBrokerService().getBroker());
405                fireAdvisory(context, topic,null,null,advisoryMessage);
406            } catch (Exception e) {
407                handleFireFailure("now master broker", e);
408            }
409        }
410    
411        @Override
412        public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
413                                          Subscription subscription){
414            super.sendToDeadLetterQueue(context, messageReference, subscription);
415            try {
416                if(!messageReference.isAdvisory()) {
417                    ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
418                    Message payload = messageReference.getMessage().copy();
419                    payload.clearBody();
420                    fireAdvisory(context, topic,payload);
421                }
422            } catch (Exception e) {
423                handleFireFailure("add to DLQ", e);
424            }
425        }
426    
427        @Override
428        public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
429            try {
430             if (brokerInfo != null) {
431                 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
432                 advisoryMessage.setBooleanProperty("started", true);
433                 advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex);
434                 advisoryMessage.setStringProperty("remoteIp", remoteIp);
435                 networkBridges.putIfAbsent(brokerInfo, advisoryMessage);
436    
437                 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
438    
439                 ConnectionContext context = new ConnectionContext();
440                 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
441                 context.setBroker(getBrokerService().getBroker());
442                 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
443             }
444            } catch (Exception e) {
445                handleFireFailure("network bridge started", e);
446            }
447        }
448    
449        @Override
450        public void networkBridgeStopped(BrokerInfo brokerInfo) {
451            try {
452             if (brokerInfo != null) {
453                 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
454                 advisoryMessage.setBooleanProperty("started", false);
455                 networkBridges.remove(brokerInfo);
456    
457                 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
458    
459                 ConnectionContext context = new ConnectionContext();
460                 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
461                 context.setBroker(getBrokerService().getBroker());
462                 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
463             }
464            } catch (Exception e) {
465                handleFireFailure("network bridge stopped", e);
466            }
467        }
468    
469        private void handleFireFailure(String message, Throwable cause) {
470            LOG.warn("Failed to fire "  + message + " advisory, reason: " + cause);
471            if (LOG.isDebugEnabled()) {
472                LOG.debug(message + " detail", cause);
473            }
474        }
475    
476        protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
477            fireAdvisory(context, topic, command, null);
478        }
479    
480        protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
481            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
482            fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
483        }
484    
485        protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception {
486            fireConsumerAdvisory(context, consumerDestination,topic, command, null);
487        }
488    
489        protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
490            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
491            int count = 0;
492            Set<Destination>set = getDestinations(consumerDestination);
493            if (set != null) {
494                for (Destination dest:set) {
495                    count += dest.getDestinationStatistics().getConsumers().getCount();
496                }
497            }
498            advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count);
499    
500            fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
501        }
502    
503        protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception {
504            fireProducerAdvisory(context,producerDestination, topic, command, null);
505        }
506    
507        protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
508            ActiveMQMessage advisoryMessage = new ActiveMQMessage();
509            int count = 0;
510            if (producerDestination != null) {
511                Set<Destination> set = getDestinations(producerDestination);
512                if (set != null) {
513                    for (Destination dest : set) {
514                        count += dest.getDestinationStatistics().getProducers().getCount();
515                    }
516                }
517            }
518            advisoryMessage.setIntProperty("producerCount", count);
519            fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
520        }
521    
522        protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
523            if (getBrokerService().isStarted()) {
524                //set properties
525                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
526                String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
527                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
528    
529                String url = getBrokerService().getVmConnectorURI().toString();
530                if (getBrokerService().getDefaultSocketURIString() != null) {
531                    url = getBrokerService().getDefaultSocketURIString();
532                }
533                advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
534    
535                //set the data structure
536                advisoryMessage.setDataStructure(command);
537                advisoryMessage.setPersistent(false);
538                advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
539                advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
540                advisoryMessage.setTargetConsumerId(targetConsumerId);
541                advisoryMessage.setDestination(topic);
542                advisoryMessage.setResponseRequired(false);
543                advisoryMessage.setProducerId(advisoryProducerId);
544                boolean originalFlowControl = context.isProducerFlowControl();
545                final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
546                producerExchange.setConnectionContext(context);
547                producerExchange.setMutable(true);
548                producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
549                try {
550                    context.setProducerFlowControl(false);
551                    next.send(producerExchange, advisoryMessage);
552                } finally {
553                    context.setProducerFlowControl(originalFlowControl);
554                }
555            }
556        }
557    
558        public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() {
559            return connections;
560        }
561    
562        public Map<ConsumerId, ConsumerInfo> getAdvisoryConsumers() {
563            return consumers;
564        }
565    
566        public Map<ProducerId, ProducerInfo> getAdvisoryProducers() {
567            return producers;
568        }
569    
570        public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() {
571            return destinations;
572        }
573    }