001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.advisory;
018
019 import java.util.Iterator;
020 import java.util.Map;
021 import java.util.Set;
022 import java.util.concurrent.ConcurrentHashMap;
023
024 import org.apache.activemq.broker.Broker;
025 import org.apache.activemq.broker.BrokerFilter;
026 import org.apache.activemq.broker.ConnectionContext;
027 import org.apache.activemq.broker.ProducerBrokerExchange;
028 import org.apache.activemq.broker.region.Destination;
029 import org.apache.activemq.broker.region.MessageReference;
030 import org.apache.activemq.broker.region.Subscription;
031 import org.apache.activemq.broker.region.TopicSubscription;
032 import org.apache.activemq.command.ActiveMQDestination;
033 import org.apache.activemq.command.ActiveMQMessage;
034 import org.apache.activemq.command.ActiveMQTopic;
035 import org.apache.activemq.command.BrokerInfo;
036 import org.apache.activemq.command.Command;
037 import org.apache.activemq.command.ConnectionId;
038 import org.apache.activemq.command.ConnectionInfo;
039 import org.apache.activemq.command.ConsumerId;
040 import org.apache.activemq.command.ConsumerInfo;
041 import org.apache.activemq.command.DestinationInfo;
042 import org.apache.activemq.command.Message;
043 import org.apache.activemq.command.MessageId;
044 import org.apache.activemq.command.ProducerId;
045 import org.apache.activemq.command.ProducerInfo;
046 import org.apache.activemq.security.SecurityContext;
047 import org.apache.activemq.state.ProducerState;
048 import org.apache.activemq.usage.Usage;
049 import org.apache.activemq.util.IdGenerator;
050 import org.apache.activemq.util.LongSequenceGenerator;
051 import org.slf4j.Logger;
052 import org.slf4j.LoggerFactory;
053
054 /**
055 * This broker filter handles tracking the state of the broker for purposes of
056 * publishing advisory messages to advisory consumers.
057 */
058 public class AdvisoryBroker extends BrokerFilter {
059
060 private static final Logger LOG = LoggerFactory.getLogger(AdvisoryBroker.class);
061 private static final IdGenerator ID_GENERATOR = new IdGenerator();
062
063 protected final ConcurrentHashMap<ConnectionId, ConnectionInfo> connections = new ConcurrentHashMap<ConnectionId, ConnectionInfo>();
064 protected final ConcurrentHashMap<ConsumerId, ConsumerInfo> consumers = new ConcurrentHashMap<ConsumerId, ConsumerInfo>();
065 protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
066 protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
067 protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
068 protected final ProducerId advisoryProducerId = new ProducerId();
069
070 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
071
072 public AdvisoryBroker(Broker next) {
073 super(next);
074 advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
075 }
076
077 @Override
078 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
079 super.addConnection(context, info);
080
081 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
082 //do not distribute usernames or passwords in advisory
083 ConnectionInfo copy = info.copy();
084 copy.setUserName("");
085 copy.setPassword("");
086 fireAdvisory(context, topic, copy);
087 connections.put(copy.getConnectionId(), copy);
088 }
089
090 @Override
091 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
092 Subscription answer = super.addConsumer(context, info);
093
094 // Don't advise advisory topics.
095 if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
096 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
097 consumers.put(info.getConsumerId(), info);
098 fireConsumerAdvisory(context, info.getDestination(), topic, info);
099 } else {
100 // We need to replay all the previously collected state objects
101 // for this newly added consumer.
102 if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) {
103 // Replay the connections.
104 for (Iterator<ConnectionInfo> iter = connections.values().iterator(); iter.hasNext();) {
105 ConnectionInfo value = iter.next();
106 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
107 fireAdvisory(context, topic, value, info.getConsumerId());
108 }
109 }
110
111 // We check here whether the Destination is Temporary Destination specific or not since we
112 // can avoid sending advisory messages to the consumer if it only wants Temporary Destination
113 // notifications. If its not just temporary destination related destinations then we have
114 // to send them all, a composite destination could want both.
115 if (AdvisorySupport.isTempDestinationAdvisoryTopic(info.getDestination())) {
116 // Replay the temporary destinations.
117 for (DestinationInfo destination : destinations.values()) {
118 if (destination.getDestination().isTemporary()) {
119 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
120 fireAdvisory(context, topic, destination, info.getConsumerId());
121 }
122 }
123 } else if (AdvisorySupport.isDestinationAdvisoryTopic(info.getDestination())) {
124 // Replay all the destinations.
125 for (DestinationInfo destination : destinations.values()) {
126 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination.getDestination());
127 fireAdvisory(context, topic, destination, info.getConsumerId());
128 }
129 }
130
131 // Replay the producers.
132 if (AdvisorySupport.isProducerAdvisoryTopic(info.getDestination())) {
133 for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) {
134 ProducerInfo value = iter.next();
135 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
136 fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId());
137 }
138 }
139
140 // Replay the consumers.
141 if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) {
142 for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext();) {
143 ConsumerInfo value = iter.next();
144 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
145 fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId());
146 }
147 }
148
149 // Replay network bridges
150 if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) {
151 for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext();) {
152 BrokerInfo key = iter.next();
153 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
154 fireAdvisory(context, topic, key, null, networkBridges.get(key));
155 }
156 }
157 }
158 return answer;
159 }
160
161 @Override
162 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
163 super.addProducer(context, info);
164
165 // Don't advise advisory topics.
166 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
167 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
168 fireProducerAdvisory(context, info.getDestination(), topic, info);
169 producers.put(info.getProducerId(), info);
170 }
171 }
172
173 @Override
174 public Destination addDestination(ConnectionContext context, ActiveMQDestination destination,boolean create) throws Exception {
175 Destination answer = super.addDestination(context, destination,create);
176 if (!AdvisorySupport.isAdvisoryTopic(destination)) {
177 DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
178 DestinationInfo previous = destinations.putIfAbsent(destination, info);
179 if( previous==null ) {
180 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
181 fireAdvisory(context, topic, info);
182 }
183 }
184 return answer;
185 }
186
187 @Override
188 public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
189 ActiveMQDestination destination = info.getDestination();
190 next.addDestinationInfo(context, info);
191
192 if (!AdvisorySupport.isAdvisoryTopic(destination)) {
193 DestinationInfo previous = destinations.putIfAbsent(destination, info);
194 if( previous==null ) {
195 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
196 fireAdvisory(context, topic, info);
197 }
198 }
199 }
200
201 @Override
202 public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
203 super.removeDestination(context, destination, timeout);
204 DestinationInfo info = destinations.remove(destination);
205 if (info != null) {
206 // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
207 info = info.copy();
208 info.setDestination(destination);
209 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
210 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destination);
211 fireAdvisory(context, topic, info);
212 ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destination);
213 for(ActiveMQTopic advisoryDestination : advisoryDestinations) {
214 try {
215 next.removeDestination(context, advisoryDestination, -1);
216 } catch (Exception expectedIfDestinationDidNotExistYet) {
217 }
218 }
219 }
220 }
221
222 @Override
223 public void removeDestinationInfo(ConnectionContext context, DestinationInfo destInfo) throws Exception {
224 super.removeDestinationInfo(context, destInfo);
225 DestinationInfo info = destinations.remove(destInfo.getDestination());
226 if (info != null) {
227 // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate
228 info = info.copy();
229 info.setDestination(destInfo.getDestination());
230 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
231 ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(destInfo.getDestination());
232 fireAdvisory(context, topic, info);
233 ActiveMQTopic[] advisoryDestinations = AdvisorySupport.getAllDestinationAdvisoryTopics(destInfo.getDestination());
234 for(ActiveMQTopic advisoryDestination : advisoryDestinations) {
235 try {
236 next.removeDestination(context, advisoryDestination, -1);
237 } catch (Exception expectedIfDestinationDidNotExistYet) {
238 }
239 }
240 }
241 }
242
243 @Override
244 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
245 super.removeConnection(context, info, error);
246
247 ActiveMQTopic topic = AdvisorySupport.getConnectionAdvisoryTopic();
248 fireAdvisory(context, topic, info.createRemoveCommand());
249 connections.remove(info.getConnectionId());
250 }
251
252 @Override
253 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
254 super.removeConsumer(context, info);
255
256 // Don't advise advisory topics.
257 ActiveMQDestination dest = info.getDestination();
258 if (!AdvisorySupport.isAdvisoryTopic(dest)) {
259 ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
260 consumers.remove(info.getConsumerId());
261 if (!dest.isTemporary() || destinations.containsKey(dest)) {
262 fireConsumerAdvisory(context,dest, topic, info.createRemoveCommand());
263 }
264 }
265 }
266
267 @Override
268 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
269 super.removeProducer(context, info);
270
271 // Don't advise advisory topics.
272 ActiveMQDestination dest = info.getDestination();
273 if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(dest)) {
274 ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(dest);
275 producers.remove(info.getProducerId());
276 if (!dest.isTemporary() || destinations.contains(dest)) {
277 fireProducerAdvisory(context, dest,topic, info.createRemoveCommand());
278 }
279 }
280 }
281
282 @Override
283 public void messageExpired(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
284 super.messageExpired(context, messageReference, subscription);
285 try {
286 if(!messageReference.isAdvisory()) {
287 ActiveMQTopic topic = AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
288 Message payload = messageReference.getMessage().copy();
289 payload.clearBody();
290 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
291 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_MESSAGE_ID, payload.getMessageId().toString());
292 fireAdvisory(context, topic, payload, null, advisoryMessage);
293 }
294 } catch (Exception e) {
295 handleFireFailure("expired", e);
296 }
297 }
298
299 @Override
300 public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
301 super.messageConsumed(context, messageReference);
302 try {
303 if(!messageReference.isAdvisory()) {
304 ActiveMQTopic topic = AdvisorySupport.getMessageConsumedAdvisoryTopic(messageReference.getMessage().getDestination());
305 Message payload = messageReference.getMessage().copy();
306 payload.clearBody();
307 fireAdvisory(context, topic,payload);
308 }
309 } catch (Exception e) {
310 handleFireFailure("consumed", e);
311 }
312 }
313
314 @Override
315 public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
316 super.messageDelivered(context, messageReference);
317 try {
318 if (!messageReference.isAdvisory()) {
319 ActiveMQTopic topic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(messageReference.getMessage().getDestination());
320 Message payload = messageReference.getMessage().copy();
321 payload.clearBody();
322 fireAdvisory(context, topic,payload);
323 }
324 } catch (Exception e) {
325 handleFireFailure("delivered", e);
326 }
327 }
328
329 @Override
330 public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
331 super.messageDiscarded(context, sub, messageReference);
332 try {
333 if (!messageReference.isAdvisory()) {
334 ActiveMQTopic topic = AdvisorySupport.getMessageDiscardedAdvisoryTopic(messageReference.getMessage().getDestination());
335 Message payload = messageReference.getMessage().copy();
336 payload.clearBody();
337 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
338 if (sub instanceof TopicSubscription) {
339 advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT, ((TopicSubscription)sub).discarded());
340 }
341 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, sub.getConsumerInfo().getConsumerId().toString());
342 fireAdvisory(context, topic, payload, null, advisoryMessage);
343 }
344 } catch (Exception e) {
345 handleFireFailure("discarded", e);
346 }
347 }
348
349 @Override
350 public void slowConsumer(ConnectionContext context, Destination destination,Subscription subs) {
351 super.slowConsumer(context, destination,subs);
352 try {
353 if (!AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
354 ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination.getActiveMQDestination());
355 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
356 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, subs.getConsumerInfo().getConsumerId().toString());
357 fireAdvisory(context, topic, subs.getConsumerInfo(), null, advisoryMessage);
358 }
359 } catch (Exception e) {
360 handleFireFailure("slow consumer", e);
361 }
362 }
363
364 @Override
365 public void fastProducer(ConnectionContext context,ProducerInfo producerInfo,ActiveMQDestination destination) {
366 super.fastProducer(context, producerInfo, destination);
367 try {
368 if (!AdvisorySupport.isAdvisoryTopic(destination)) {
369 ActiveMQTopic topic = AdvisorySupport.getFastProducerAdvisoryTopic(destination);
370 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
371 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_PRODUCER_ID, producerInfo.getProducerId().toString());
372 fireAdvisory(context, topic, producerInfo, null, advisoryMessage);
373 }
374 } catch (Exception e) {
375 handleFireFailure("fast producer", e);
376 }
377 }
378
379 @Override
380 public void isFull(ConnectionContext context, Destination destination, Usage usage) {
381 super.isFull(context, destination, usage);
382 if (AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination()) == false) {
383 try {
384
385 ActiveMQTopic topic = AdvisorySupport.getFullAdvisoryTopic(destination.getActiveMQDestination());
386 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
387 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_USAGE_NAME, usage.getName());
388 fireAdvisory(context, topic, null, null, advisoryMessage);
389
390 } catch (Exception e) {
391 handleFireFailure("is full", e);
392 }
393 }
394 }
395
396 @Override
397 public void nowMasterBroker() {
398 super.nowMasterBroker();
399 try {
400 ActiveMQTopic topic = AdvisorySupport.getMasterBrokerAdvisoryTopic();
401 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
402 ConnectionContext context = new ConnectionContext();
403 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
404 context.setBroker(getBrokerService().getBroker());
405 fireAdvisory(context, topic,null,null,advisoryMessage);
406 } catch (Exception e) {
407 handleFireFailure("now master broker", e);
408 }
409 }
410
411 @Override
412 public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference,
413 Subscription subscription){
414 super.sendToDeadLetterQueue(context, messageReference, subscription);
415 try {
416 if(!messageReference.isAdvisory()) {
417 ActiveMQTopic topic = AdvisorySupport.getMessageDLQdAdvisoryTopic(messageReference.getMessage().getDestination());
418 Message payload = messageReference.getMessage().copy();
419 payload.clearBody();
420 fireAdvisory(context, topic,payload);
421 }
422 } catch (Exception e) {
423 handleFireFailure("add to DLQ", e);
424 }
425 }
426
427 @Override
428 public void networkBridgeStarted(BrokerInfo brokerInfo, boolean createdByDuplex, String remoteIp) {
429 try {
430 if (brokerInfo != null) {
431 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
432 advisoryMessage.setBooleanProperty("started", true);
433 advisoryMessage.setBooleanProperty("createdByDuplex", createdByDuplex);
434 advisoryMessage.setStringProperty("remoteIp", remoteIp);
435 networkBridges.putIfAbsent(brokerInfo, advisoryMessage);
436
437 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
438
439 ConnectionContext context = new ConnectionContext();
440 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
441 context.setBroker(getBrokerService().getBroker());
442 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
443 }
444 } catch (Exception e) {
445 handleFireFailure("network bridge started", e);
446 }
447 }
448
449 @Override
450 public void networkBridgeStopped(BrokerInfo brokerInfo) {
451 try {
452 if (brokerInfo != null) {
453 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
454 advisoryMessage.setBooleanProperty("started", false);
455 networkBridges.remove(brokerInfo);
456
457 ActiveMQTopic topic = AdvisorySupport.getNetworkBridgeAdvisoryTopic();
458
459 ConnectionContext context = new ConnectionContext();
460 context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
461 context.setBroker(getBrokerService().getBroker());
462 fireAdvisory(context, topic, brokerInfo, null, advisoryMessage);
463 }
464 } catch (Exception e) {
465 handleFireFailure("network bridge stopped", e);
466 }
467 }
468
469 private void handleFireFailure(String message, Throwable cause) {
470 LOG.warn("Failed to fire " + message + " advisory, reason: " + cause);
471 if (LOG.isDebugEnabled()) {
472 LOG.debug(message + " detail", cause);
473 }
474 }
475
476 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
477 fireAdvisory(context, topic, command, null);
478 }
479
480 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
481 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
482 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
483 }
484
485 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception {
486 fireConsumerAdvisory(context, consumerDestination,topic, command, null);
487 }
488
489 protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
490 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
491 int count = 0;
492 Set<Destination>set = getDestinations(consumerDestination);
493 if (set != null) {
494 for (Destination dest:set) {
495 count += dest.getDestinationStatistics().getConsumers().getCount();
496 }
497 }
498 advisoryMessage.setIntProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_COUNT, count);
499
500 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
501 }
502
503 protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception {
504 fireProducerAdvisory(context,producerDestination, topic, command, null);
505 }
506
507 protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
508 ActiveMQMessage advisoryMessage = new ActiveMQMessage();
509 int count = 0;
510 if (producerDestination != null) {
511 Set<Destination> set = getDestinations(producerDestination);
512 if (set != null) {
513 for (Destination dest : set) {
514 count += dest.getDestinationStatistics().getProducers().getCount();
515 }
516 }
517 }
518 advisoryMessage.setIntProperty("producerCount", count);
519 fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
520 }
521
522 protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId, ActiveMQMessage advisoryMessage) throws Exception {
523 if (getBrokerService().isStarted()) {
524 //set properties
525 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
526 String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
527 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
528
529 String url = getBrokerService().getVmConnectorURI().toString();
530 if (getBrokerService().getDefaultSocketURIString() != null) {
531 url = getBrokerService().getDefaultSocketURIString();
532 }
533 advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
534
535 //set the data structure
536 advisoryMessage.setDataStructure(command);
537 advisoryMessage.setPersistent(false);
538 advisoryMessage.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
539 advisoryMessage.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
540 advisoryMessage.setTargetConsumerId(targetConsumerId);
541 advisoryMessage.setDestination(topic);
542 advisoryMessage.setResponseRequired(false);
543 advisoryMessage.setProducerId(advisoryProducerId);
544 boolean originalFlowControl = context.isProducerFlowControl();
545 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
546 producerExchange.setConnectionContext(context);
547 producerExchange.setMutable(true);
548 producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
549 try {
550 context.setProducerFlowControl(false);
551 next.send(producerExchange, advisoryMessage);
552 } finally {
553 context.setProducerFlowControl(originalFlowControl);
554 }
555 }
556 }
557
558 public Map<ConnectionId, ConnectionInfo> getAdvisoryConnections() {
559 return connections;
560 }
561
562 public Map<ConsumerId, ConsumerInfo> getAdvisoryConsumers() {
563 return consumers;
564 }
565
566 public Map<ProducerId, ProducerInfo> getAdvisoryProducers() {
567 return producers;
568 }
569
570 public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() {
571 return destinations;
572 }
573 }