001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.jmx;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.List;
024 import java.util.Map;
025 import java.util.Map.Entry;
026 import java.util.Set;
027 import java.util.concurrent.ConcurrentHashMap;
028 import java.util.concurrent.CopyOnWriteArraySet;
029 import java.util.concurrent.ExecutorService;
030 import java.util.concurrent.ThreadPoolExecutor;
031
032 import javax.management.InstanceNotFoundException;
033 import javax.management.MalformedObjectNameException;
034 import javax.management.ObjectName;
035 import javax.management.openmbean.CompositeData;
036 import javax.management.openmbean.CompositeDataSupport;
037 import javax.management.openmbean.CompositeType;
038 import javax.management.openmbean.OpenDataException;
039 import javax.management.openmbean.TabularData;
040 import javax.management.openmbean.TabularDataSupport;
041 import javax.management.openmbean.TabularType;
042
043 import org.apache.activemq.broker.Broker;
044 import org.apache.activemq.broker.BrokerService;
045 import org.apache.activemq.broker.ConnectionContext;
046 import org.apache.activemq.broker.ProducerBrokerExchange;
047 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
048 import org.apache.activemq.broker.region.Destination;
049 import org.apache.activemq.broker.region.DestinationFactory;
050 import org.apache.activemq.broker.region.DestinationFactoryImpl;
051 import org.apache.activemq.broker.region.DestinationInterceptor;
052 import org.apache.activemq.broker.region.Queue;
053 import org.apache.activemq.broker.region.Region;
054 import org.apache.activemq.broker.region.RegionBroker;
055 import org.apache.activemq.broker.region.Subscription;
056 import org.apache.activemq.broker.region.Topic;
057 import org.apache.activemq.broker.region.TopicRegion;
058 import org.apache.activemq.broker.region.TopicSubscription;
059 import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
060 import org.apache.activemq.command.ActiveMQDestination;
061 import org.apache.activemq.command.ActiveMQMessage;
062 import org.apache.activemq.command.ActiveMQTopic;
063 import org.apache.activemq.command.ConsumerInfo;
064 import org.apache.activemq.command.Message;
065 import org.apache.activemq.command.MessageId;
066 import org.apache.activemq.command.ProducerInfo;
067 import org.apache.activemq.command.SubscriptionInfo;
068 import org.apache.activemq.store.MessageRecoveryListener;
069 import org.apache.activemq.store.PersistenceAdapter;
070 import org.apache.activemq.store.TopicMessageStore;
071 import org.apache.activemq.thread.Scheduler;
072 import org.apache.activemq.thread.TaskRunnerFactory;
073 import org.apache.activemq.transaction.XATransaction;
074 import org.apache.activemq.usage.SystemUsage;
075 import org.apache.activemq.util.ServiceStopper;
076 import org.apache.activemq.util.SubscriptionKey;
077 import org.slf4j.Logger;
078 import org.slf4j.LoggerFactory;
079
080 public class ManagedRegionBroker extends RegionBroker {
081 private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
082 private final ManagementContext managementContext;
083 private final ObjectName brokerObjectName;
084 private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
085 private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
086 private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
087 private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
088 private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
089 private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
090 private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091 private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
092 private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
093 private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
094 private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
095 private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
096 private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
097 private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
098 private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
099 private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
100 private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
101 private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
102 /* This is the first broker in the broker interceptor chain. */
103 private Broker contextBroker;
104
105 private final ExecutorService asyncInvokeService;
106 private final long mbeanTimeout;
107
108 public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
109 DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
110 super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
111 this.managementContext = context;
112 this.brokerObjectName = brokerObjectName;
113 this.mbeanTimeout = brokerService.getMbeanInvocationTimeout();
114 this.asyncInvokeService = mbeanTimeout > 0 ? executor : null;;
115 }
116
117 @Override
118 public void start() throws Exception {
119 super.start();
120 // build all existing durable subscriptions
121 buildExistingSubscriptions();
122 }
123
124 @Override
125 protected void doStop(ServiceStopper stopper) {
126 super.doStop(stopper);
127 // lets remove any mbeans not yet removed
128 for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
129 ObjectName name = iter.next();
130 try {
131 managementContext.unregisterMBean(name);
132 } catch (InstanceNotFoundException e) {
133 LOG.warn("The MBean: " + name + " is no longer registered with JMX");
134 } catch (Exception e) {
135 stopper.onException(this, e);
136 }
137 }
138 registeredMBeans.clear();
139 }
140
141 @Override
142 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
143 return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
144 }
145
146 @Override
147 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
148 return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
149 }
150
151 @Override
152 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
153 return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
154 }
155
156 @Override
157 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
158 return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
159 }
160
161 public void register(ActiveMQDestination destName, Destination destination) {
162 // TODO refactor to allow views for custom destinations
163 try {
164 ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName);
165 DestinationView view;
166 if (destination instanceof Queue) {
167 view = new QueueView(this, (Queue)destination);
168 } else if (destination instanceof Topic) {
169 view = new TopicView(this, (Topic)destination);
170 } else {
171 view = null;
172 LOG.warn("JMX View is not supported for custom destination: " + destination);
173 }
174 if (view != null) {
175 registerDestination(objectName, destName, view);
176 }
177 } catch (Exception e) {
178 LOG.error("Failed to register destination " + destName, e);
179 }
180 }
181
182 public void unregister(ActiveMQDestination destName) {
183 try {
184 ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName);
185 unregisterDestination(objectName);
186 } catch (Exception e) {
187 LOG.error("Failed to unregister " + destName, e);
188 }
189 }
190
191 public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
192 String connectionClientId = context.getClientId();
193
194 SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
195 try {
196 ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, sub.getConsumerInfo());
197 SubscriptionView view;
198 if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
199 // add offline subscribers to inactive list
200 SubscriptionInfo info = new SubscriptionInfo();
201 info.setClientId(context.getClientId());
202 info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
203 info.setDestination(sub.getConsumerInfo().getDestination());
204 info.setSelector(sub.getSelector());
205 addInactiveSubscription(key, info, sub);
206 } else {
207 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
208 if (sub.getConsumerInfo().isDurable()) {
209 view = new DurableSubscriptionView(this, context.getClientId(), userName, sub);
210 } else {
211 if (sub instanceof TopicSubscription) {
212 view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
213 } else {
214 view = new SubscriptionView(context.getClientId(), userName, sub);
215 }
216 }
217 registerSubscription(objectName, sub.getConsumerInfo(), key, view);
218 }
219 subscriptionMap.put(sub, objectName);
220 return objectName;
221 } catch (Exception e) {
222 LOG.error("Failed to register subscription " + sub, e);
223 return null;
224 }
225 }
226
227 @Override
228 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
229 Subscription sub = super.addConsumer(context, info);
230 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
231 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
232 if (inactiveName != null) {
233 // if it was inactive, register it
234 registerSubscription(context, sub);
235 }
236 return sub;
237 }
238
239 @Override
240 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
241 for (Subscription sub : subscriptionMap.keySet()) {
242 if (sub.getConsumerInfo().equals(info)) {
243 // unregister all consumer subs
244 unregisterSubscription(subscriptionMap.get(sub), true);
245 }
246 }
247 super.removeConsumer(context, info);
248 }
249
250 @Override
251 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
252 super.addProducer(context, info);
253 String connectionClientId = context.getClientId();
254 ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info);
255 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
256 ProducerView view = new ProducerView(info, connectionClientId, userName, this);
257 registerProducer(objectName, info.getDestination(), view);
258 }
259
260 @Override
261 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
262 ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info);
263 unregisterProducer(objectName);
264 super.removeProducer(context, info);
265 }
266
267 @Override
268 public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
269 if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
270 ProducerInfo info = exchange.getProducerState().getInfo();
271 if (info.getDestination() == null && info.getProducerId() != null) {
272 ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, exchange.getConnectionContext().getClientId(), info);
273 ProducerView view = this.dynamicDestinationProducers.get(objectName);
274 if (view != null) {
275 ActiveMQDestination dest = message.getDestination();
276 if (dest != null) {
277 view.setLastUsedDestinationName(dest);
278 }
279 }
280 }
281 }
282 super.send(exchange, message);
283 }
284
285 public void unregisterSubscription(Subscription sub) {
286 ObjectName name = subscriptionMap.remove(sub);
287 if (name != null) {
288 try {
289 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
290 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
291 if (inactiveName != null) {
292 inactiveDurableTopicSubscribers.remove(inactiveName);
293 managementContext.unregisterMBean(inactiveName);
294 }
295 } catch (Exception e) {
296 LOG.error("Failed to unregister subscription " + sub, e);
297 }
298 }
299 }
300
301 protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
302 if (dest.isQueue()) {
303 if (dest.isTemporary()) {
304 temporaryQueues.put(key, view);
305 } else {
306 queues.put(key, view);
307 }
308 } else {
309 if (dest.isTemporary()) {
310 temporaryTopics.put(key, view);
311 } else {
312 topics.put(key, view);
313 }
314 }
315 try {
316 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
317 registeredMBeans.add(key);
318 } catch (Throwable e) {
319 LOG.warn("Failed to register MBean: " + key);
320 LOG.debug("Failure reason: " + e, e);
321 }
322 }
323
324 protected void unregisterDestination(ObjectName key) throws Exception {
325
326 DestinationView view = removeAndRemember(topics, key, null);
327 view = removeAndRemember(queues, key, view);
328 view = removeAndRemember(temporaryQueues, key, view);
329 view = removeAndRemember(temporaryTopics, key, view);
330 if (registeredMBeans.remove(key)) {
331 try {
332 managementContext.unregisterMBean(key);
333 } catch (Throwable e) {
334 LOG.warn("Failed to unregister MBean: " + key);
335 LOG.debug("Failure reason: " + e, e);
336 }
337 }
338 if (view != null) {
339 key = view.getSlowConsumerStrategy();
340 if (key!= null && registeredMBeans.remove(key)) {
341 try {
342 managementContext.unregisterMBean(key);
343 } catch (Throwable e) {
344 LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
345 LOG.debug("Failure reason: " + e, e);
346 }
347 }
348 }
349 }
350
351 protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception {
352
353 if (dest != null) {
354 if (dest.isQueue()) {
355 if (dest.isTemporary()) {
356 temporaryQueueProducers.put(key, view);
357 } else {
358 queueProducers.put(key, view);
359 }
360 } else {
361 if (dest.isTemporary()) {
362 temporaryTopicProducers.put(key, view);
363 } else {
364 topicProducers.put(key, view);
365 }
366 }
367 } else {
368 dynamicDestinationProducers.put(key, view);
369 }
370
371 try {
372 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
373 registeredMBeans.add(key);
374 } catch (Throwable e) {
375 LOG.warn("Failed to register MBean: " + key);
376 LOG.debug("Failure reason: " + e, e);
377 }
378 }
379
380 protected void unregisterProducer(ObjectName key) throws Exception {
381 queueProducers.remove(key);
382 topicProducers.remove(key);
383 temporaryQueueProducers.remove(key);
384 temporaryTopicProducers.remove(key);
385 dynamicDestinationProducers.remove(key);
386 if (registeredMBeans.remove(key)) {
387 try {
388 managementContext.unregisterMBean(key);
389 } catch (Throwable e) {
390 LOG.warn("Failed to unregister MBean: " + key);
391 LOG.debug("Failure reason: " + e, e);
392 }
393 }
394 }
395
396 private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
397 DestinationView candidate = map.remove(key);
398 if (candidate != null && view == null) {
399 view = candidate;
400 }
401 return candidate != null ? candidate : view;
402 }
403
404 protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
405 ActiveMQDestination dest = info.getDestination();
406 if (dest.isQueue()) {
407 if (dest.isTemporary()) {
408 temporaryQueueSubscribers.put(key, view);
409 } else {
410 queueSubscribers.put(key, view);
411 }
412 } else {
413 if (dest.isTemporary()) {
414 temporaryTopicSubscribers.put(key, view);
415 } else {
416 if (info.isDurable()) {
417 durableTopicSubscribers.put(key, view);
418 // unregister any inactive durable subs
419 try {
420 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
421 if (inactiveName != null) {
422 inactiveDurableTopicSubscribers.remove(inactiveName);
423 registeredMBeans.remove(inactiveName);
424 managementContext.unregisterMBean(inactiveName);
425 }
426 } catch (Throwable e) {
427 LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
428 }
429 } else {
430 topicSubscribers.put(key, view);
431 }
432 }
433 }
434
435 try {
436 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
437 registeredMBeans.add(key);
438 } catch (Throwable e) {
439 LOG.warn("Failed to register MBean: " + key);
440 LOG.debug("Failure reason: " + e, e);
441 }
442
443 }
444
445 protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
446 queueSubscribers.remove(key);
447 topicSubscribers.remove(key);
448 temporaryQueueSubscribers.remove(key);
449 temporaryTopicSubscribers.remove(key);
450 if (registeredMBeans.remove(key)) {
451 try {
452 managementContext.unregisterMBean(key);
453 } catch (Throwable e) {
454 LOG.warn("Failed to unregister MBean: " + key);
455 LOG.debug("Failure reason: " + e, e);
456 }
457 }
458 DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
459 if (view != null) {
460 // need to put this back in the inactive list
461 SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
462 if (addToInactive) {
463 SubscriptionInfo info = new SubscriptionInfo();
464 info.setClientId(subscriptionKey.getClientId());
465 info.setSubscriptionName(subscriptionKey.getSubscriptionName());
466 info.setDestination(new ActiveMQTopic(view.getDestinationName()));
467 info.setSelector(view.getSelector());
468 addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
469 }
470 }
471 }
472
473 protected void buildExistingSubscriptions() throws Exception {
474 Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
475 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
476 if (destinations != null) {
477 for (ActiveMQDestination dest : destinations) {
478 if (dest.isTopic()) {
479 SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
480 if (infos != null) {
481 for (int i = 0; i < infos.length; i++) {
482 SubscriptionInfo info = infos[i];
483 SubscriptionKey key = new SubscriptionKey(info);
484 if (!alreadyKnown(key)) {
485 LOG.debug("Restoring durable subscription mbean: " + info);
486 subscriptions.put(key, info);
487 }
488 }
489 }
490 }
491 }
492 }
493
494 for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) {
495 addInactiveSubscription(entry.getKey(), entry.getValue(), null);
496 }
497 }
498
499 private boolean alreadyKnown(SubscriptionKey key) {
500 boolean known = false;
501 known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
502 if (LOG.isTraceEnabled()) {
503 LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") + " already registered");
504 }
505 return known;
506 }
507
508 protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
509 try {
510 ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
511 ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo);
512 SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription);
513
514 try {
515 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
516 registeredMBeans.add(objectName);
517 } catch (Throwable e) {
518 LOG.warn("Failed to register MBean: " + key);
519 LOG.debug("Failure reason: " + e, e);
520 }
521
522 inactiveDurableTopicSubscribers.put(objectName, view);
523 subscriptionKeys.put(key, objectName);
524 } catch (Exception e) {
525 LOG.error("Failed to register subscription " + info, e);
526 }
527 }
528
529 public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
530 List<Message> messages = getSubscriberMessages(view);
531 CompositeData c[] = new CompositeData[messages.size()];
532 for (int i = 0; i < c.length; i++) {
533 try {
534 c[i] = OpenTypeSupport.convert(messages.get(i));
535 } catch (Throwable e) {
536 LOG.error("failed to browse : " + view, e);
537 }
538 }
539 return c;
540 }
541
542 public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
543 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
544 List<Message> messages = getSubscriberMessages(view);
545 CompositeType ct = factory.getCompositeType();
546 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
547 TabularDataSupport rc = new TabularDataSupport(tt);
548 for (int i = 0; i < messages.size(); i++) {
549 rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
550 }
551 return rc;
552 }
553
554 protected List<Message> getSubscriberMessages(SubscriptionView view) {
555 // TODO It is very dangerous operation for big backlogs
556 if (!(destinationFactory instanceof DestinationFactoryImpl)) {
557 throw new RuntimeException("unsupported by " + destinationFactory);
558 }
559 PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
560 final List<Message> result = new ArrayList<Message>();
561 try {
562 ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
563 TopicMessageStore store = adapter.createTopicMessageStore(topic);
564 store.recover(new MessageRecoveryListener() {
565 @Override
566 public boolean recoverMessage(Message message) throws Exception {
567 result.add(message);
568 return true;
569 }
570
571 @Override
572 public boolean recoverMessageReference(MessageId messageReference) throws Exception {
573 throw new RuntimeException("Should not be called.");
574 }
575
576 @Override
577 public boolean hasSpace() {
578 return true;
579 }
580
581 @Override
582 public boolean isDuplicate(MessageId id) {
583 return false;
584 }
585 });
586 } catch (Throwable e) {
587 LOG.error("Failed to browse messages for Subscription " + view, e);
588 }
589 return result;
590
591 }
592
593 protected ObjectName[] getTopics() {
594 Set<ObjectName> set = topics.keySet();
595 return set.toArray(new ObjectName[set.size()]);
596 }
597
598 protected ObjectName[] getQueues() {
599 Set<ObjectName> set = queues.keySet();
600 return set.toArray(new ObjectName[set.size()]);
601 }
602
603 protected ObjectName[] getTemporaryTopics() {
604 Set<ObjectName> set = temporaryTopics.keySet();
605 return set.toArray(new ObjectName[set.size()]);
606 }
607
608 protected ObjectName[] getTemporaryQueues() {
609 Set<ObjectName> set = temporaryQueues.keySet();
610 return set.toArray(new ObjectName[set.size()]);
611 }
612
613 protected ObjectName[] getTopicSubscribers() {
614 Set<ObjectName> set = topicSubscribers.keySet();
615 return set.toArray(new ObjectName[set.size()]);
616 }
617
618 protected ObjectName[] getDurableTopicSubscribers() {
619 Set<ObjectName> set = durableTopicSubscribers.keySet();
620 return set.toArray(new ObjectName[set.size()]);
621 }
622
623 protected ObjectName[] getQueueSubscribers() {
624 Set<ObjectName> set = queueSubscribers.keySet();
625 return set.toArray(new ObjectName[set.size()]);
626 }
627
628 protected ObjectName[] getTemporaryTopicSubscribers() {
629 Set<ObjectName> set = temporaryTopicSubscribers.keySet();
630 return set.toArray(new ObjectName[set.size()]);
631 }
632
633 protected ObjectName[] getTemporaryQueueSubscribers() {
634 Set<ObjectName> set = temporaryQueueSubscribers.keySet();
635 return set.toArray(new ObjectName[set.size()]);
636 }
637
638 protected ObjectName[] getInactiveDurableTopicSubscribers() {
639 Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
640 return set.toArray(new ObjectName[set.size()]);
641 }
642
643 protected ObjectName[] getTopicProducers() {
644 Set<ObjectName> set = topicProducers.keySet();
645 return set.toArray(new ObjectName[set.size()]);
646 }
647
648 protected ObjectName[] getQueueProducers() {
649 Set<ObjectName> set = queueProducers.keySet();
650 return set.toArray(new ObjectName[set.size()]);
651 }
652
653 protected ObjectName[] getTemporaryTopicProducers() {
654 Set<ObjectName> set = temporaryTopicProducers.keySet();
655 return set.toArray(new ObjectName[set.size()]);
656 }
657
658 protected ObjectName[] getTemporaryQueueProducers() {
659 Set<ObjectName> set = temporaryQueueProducers.keySet();
660 return set.toArray(new ObjectName[set.size()]);
661 }
662
663 protected ObjectName[] getDynamicDestinationProducers() {
664 Set<ObjectName> set = dynamicDestinationProducers.keySet();
665 return set.toArray(new ObjectName[set.size()]);
666 }
667
668 public Broker getContextBroker() {
669 return contextBroker;
670 }
671
672 public void setContextBroker(Broker contextBroker) {
673 this.contextBroker = contextBroker;
674 }
675
676 public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
677 ObjectName objectName = null;
678 try {
679 objectName = BrokerMBeanSupport.createAbortSlowConsumerStrategyName(brokerObjectName, strategy);
680 if (!registeredMBeans.contains(objectName)) {
681 AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
682 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
683 registeredMBeans.add(objectName);
684 }
685 } catch (Exception e) {
686 LOG.warn("Failed to register MBean: " + strategy);
687 LOG.debug("Failure reason: " + e, e);
688 }
689 return objectName;
690 }
691
692 public void registerRecoveredTransactionMBean(XATransaction transaction) {
693 try {
694 ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
695 if (!registeredMBeans.contains(objectName)) {
696 RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
697 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
698 registeredMBeans.add(objectName);
699 }
700 } catch (Exception e) {
701 LOG.warn("Failed to register prepared transaction MBean: " + transaction);
702 LOG.debug("Failure reason: " + e, e);
703 }
704 }
705
706 public void unregister(XATransaction transaction) {
707 try {
708 ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
709 if (registeredMBeans.remove(objectName)) {
710 try {
711 managementContext.unregisterMBean(objectName);
712 } catch (Throwable e) {
713 LOG.warn("Failed to unregister MBean: " + objectName);
714 LOG.debug("Failure reason: " + e, e);
715 }
716 }
717 } catch (Exception e) {
718 LOG.warn("Failed to create object name to unregister " + transaction, e);
719 }
720 }
721
722 public ObjectName getSubscriberObjectName(Subscription key) {
723 return subscriptionMap.get(key);
724 }
725
726 public Subscription getSubscriber(ObjectName key) {
727 Subscription sub = null;
728 for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
729 if (entry.getValue().equals(key)) {
730 sub = entry.getKey();
731 break;
732 }
733 }
734 return sub;
735 }
736
737 public Map<ObjectName, DestinationView> getQueueViews() {
738 return queues;
739 }
740 }