001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.jmx;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.Hashtable;
023 import java.util.Iterator;
024 import java.util.List;
025 import java.util.Map;
026 import java.util.Map.Entry;
027 import java.util.Set;
028 import java.util.concurrent.ConcurrentHashMap;
029 import java.util.concurrent.CopyOnWriteArraySet;
030 import java.util.concurrent.ExecutorService;
031 import java.util.concurrent.ThreadPoolExecutor;
032
033 import javax.management.InstanceNotFoundException;
034 import javax.management.MalformedObjectNameException;
035 import javax.management.ObjectName;
036 import javax.management.openmbean.CompositeData;
037 import javax.management.openmbean.CompositeDataSupport;
038 import javax.management.openmbean.CompositeType;
039 import javax.management.openmbean.OpenDataException;
040 import javax.management.openmbean.TabularData;
041 import javax.management.openmbean.TabularDataSupport;
042 import javax.management.openmbean.TabularType;
043
044 import org.apache.activemq.broker.Broker;
045 import org.apache.activemq.broker.BrokerService;
046 import org.apache.activemq.broker.ConnectionContext;
047 import org.apache.activemq.broker.ProducerBrokerExchange;
048 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
049 import org.apache.activemq.broker.region.Destination;
050 import org.apache.activemq.broker.region.DestinationFactory;
051 import org.apache.activemq.broker.region.DestinationFactoryImpl;
052 import org.apache.activemq.broker.region.DestinationInterceptor;
053 import org.apache.activemq.broker.region.Queue;
054 import org.apache.activemq.broker.region.Region;
055 import org.apache.activemq.broker.region.RegionBroker;
056 import org.apache.activemq.broker.region.Subscription;
057 import org.apache.activemq.broker.region.Topic;
058 import org.apache.activemq.broker.region.TopicRegion;
059 import org.apache.activemq.broker.region.TopicSubscription;
060 import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
061 import org.apache.activemq.command.ActiveMQDestination;
062 import org.apache.activemq.command.ActiveMQMessage;
063 import org.apache.activemq.command.ActiveMQTopic;
064 import org.apache.activemq.command.ConsumerInfo;
065 import org.apache.activemq.command.Message;
066 import org.apache.activemq.command.MessageId;
067 import org.apache.activemq.command.ProducerInfo;
068 import org.apache.activemq.command.SubscriptionInfo;
069 import org.apache.activemq.store.MessageRecoveryListener;
070 import org.apache.activemq.store.PersistenceAdapter;
071 import org.apache.activemq.store.TopicMessageStore;
072 import org.apache.activemq.thread.Scheduler;
073 import org.apache.activemq.thread.TaskRunnerFactory;
074 import org.apache.activemq.transaction.XATransaction;
075 import org.apache.activemq.usage.SystemUsage;
076 import org.apache.activemq.util.JMXSupport;
077 import org.apache.activemq.util.ServiceStopper;
078 import org.apache.activemq.util.SubscriptionKey;
079 import org.slf4j.Logger;
080 import org.slf4j.LoggerFactory;
081
082 public class ManagedRegionBroker extends RegionBroker {
083 private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
084 private final ManagementContext managementContext;
085 private final ObjectName brokerObjectName;
086 private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
087 private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
088 private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
089 private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
090 private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091 private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
092 private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
093 private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
094 private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
095 private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
096 private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
097 private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
098 private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
099 private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
100 private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
101 private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
102 private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
103 private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
104 /* This is the first broker in the broker interceptor chain. */
105 private Broker contextBroker;
106
107 private final ExecutorService asyncInvokeService;
108 private final long mbeanTimeout;
109
110 public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
111 DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
112 super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
113 this.managementContext = context;
114 this.brokerObjectName = brokerObjectName;
115 this.mbeanTimeout = brokerService.getMbeanInvocationTimeout();
116 this.asyncInvokeService = mbeanTimeout > 0 ? executor : null;;
117 }
118
119 @Override
120 public void start() throws Exception {
121 super.start();
122 // build all existing durable subscriptions
123 buildExistingSubscriptions();
124 }
125
126 @Override
127 protected void doStop(ServiceStopper stopper) {
128 super.doStop(stopper);
129 // lets remove any mbeans not yet removed
130 for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
131 ObjectName name = iter.next();
132 try {
133 managementContext.unregisterMBean(name);
134 } catch (InstanceNotFoundException e) {
135 LOG.warn("The MBean: " + name + " is no longer registered with JMX");
136 } catch (Exception e) {
137 stopper.onException(this, e);
138 }
139 }
140 registeredMBeans.clear();
141 }
142
143 @Override
144 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
145 return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
146 }
147
148 @Override
149 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
150 return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
151 }
152
153 @Override
154 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
155 return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
156 }
157
158 @Override
159 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
160 return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
161 }
162
163 public void register(ActiveMQDestination destName, Destination destination) {
164 // TODO refactor to allow views for custom destinations
165 try {
166 ObjectName objectName = createObjectName(destName);
167 DestinationView view;
168 if (destination instanceof Queue) {
169 view = new QueueView(this, (Queue)destination);
170 } else if (destination instanceof Topic) {
171 view = new TopicView(this, (Topic)destination);
172 } else {
173 view = null;
174 LOG.warn("JMX View is not supported for custom destination: " + destination);
175 }
176 if (view != null) {
177 registerDestination(objectName, destName, view);
178 }
179 } catch (Exception e) {
180 LOG.error("Failed to register destination " + destName, e);
181 }
182 }
183
184 public void unregister(ActiveMQDestination destName) {
185 try {
186 ObjectName objectName = createObjectName(destName);
187 unregisterDestination(objectName);
188 } catch (Exception e) {
189 LOG.error("Failed to unregister " + destName, e);
190 }
191 }
192
193 public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
194 String connectionClientId = context.getClientId();
195 ObjectName brokerJmxObjectName = brokerObjectName;
196 String objectNameStr = getSubscriptionObjectName(sub.getConsumerInfo(), connectionClientId, brokerJmxObjectName);
197 SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
198 try {
199 ObjectName objectName = new ObjectName(objectNameStr);
200 SubscriptionView view;
201 if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
202 // add offline subscribers to inactive list
203 SubscriptionInfo info = new SubscriptionInfo();
204 info.setClientId(context.getClientId());
205 info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
206 info.setDestination(sub.getConsumerInfo().getDestination());
207 info.setSelector(sub.getSelector());
208 addInactiveSubscription(key, info, sub);
209 } else {
210 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
211 if (sub.getConsumerInfo().isDurable()) {
212 view = new DurableSubscriptionView(this, context.getClientId(), userName, sub);
213 } else {
214 if (sub instanceof TopicSubscription) {
215 view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
216 } else {
217 view = new SubscriptionView(context.getClientId(), userName, sub);
218 }
219 }
220 registerSubscription(objectName, sub.getConsumerInfo(), key, view);
221 }
222 subscriptionMap.put(sub, objectName);
223 return objectName;
224 } catch (Exception e) {
225 LOG.error("Failed to register subscription " + sub, e);
226 return null;
227 }
228 }
229
230 public static String getSubscriptionObjectName(ConsumerInfo info, String connectionClientId, ObjectName brokerJmxObjectName) {
231 Hashtable<String, String> map = brokerJmxObjectName.getKeyPropertyList();
232 String brokerDomain = brokerJmxObjectName.getDomain();
233 String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
234 String destinationType = "destinationType=" + info.getDestination().getDestinationTypeAsString();
235 String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(info.getDestination().getPhysicalName());
236 String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
237 String persistentMode = "persistentMode=";
238 String consumerId = "";
239 if (info.isDurable()) {
240 persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(info.getSubscriptionName());
241 } else {
242 persistentMode += "Non-Durable";
243 if (info.getConsumerId() != null) {
244 consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(info.getConsumerId().toString());
245 }
246 }
247 objectNameStr += persistentMode + ",";
248 objectNameStr += destinationType + ",";
249 objectNameStr += destinationName + ",";
250 objectNameStr += clientId;
251 objectNameStr += consumerId;
252 return objectNameStr;
253 }
254
255 @Override
256 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
257 Subscription sub = super.addConsumer(context, info);
258 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
259 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
260 if (inactiveName != null) {
261 // if it was inactive, register it
262 registerSubscription(context, sub);
263 }
264 return sub;
265 }
266
267 @Override
268 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
269 for (Subscription sub : subscriptionMap.keySet()) {
270 if (sub.getConsumerInfo().equals(info)) {
271 // unregister all consumer subs
272 unregisterSubscription(subscriptionMap.get(sub), true);
273 }
274 }
275 super.removeConsumer(context, info);
276 }
277
278 @Override
279 public void addProducer(ConnectionContext context, ProducerInfo info)
280 throws Exception {
281 super.addProducer(context, info);
282 String connectionClientId = context.getClientId();
283 ObjectName objectName = createObjectName(info, connectionClientId);
284 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
285 ProducerView view = new ProducerView(info, connectionClientId, userName, this);
286 registerProducer(objectName, info.getDestination(), view);
287 }
288
289 @Override
290 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
291 ObjectName objectName = createObjectName(info, context.getClientId());
292 unregisterProducer(objectName);
293 super.removeProducer(context, info);
294 }
295
296 @Override
297 public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
298 if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
299 ProducerInfo info = exchange.getProducerState().getInfo();
300 if (info.getDestination() == null && info.getProducerId() != null) {
301 ObjectName objectName = createObjectName(info, exchange.getConnectionContext().getClientId());
302 ProducerView view = this.dynamicDestinationProducers.get(objectName);
303 if (view != null) {
304 ActiveMQDestination dest = message.getDestination();
305 if (dest != null) {
306 view.setLastUsedDestinationName(dest);
307 }
308 }
309 }
310 }
311 super.send(exchange, message);
312 }
313
314 public void unregisterSubscription(Subscription sub) {
315 ObjectName name = subscriptionMap.remove(sub);
316 if (name != null) {
317 try {
318 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
319 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
320 if (inactiveName != null) {
321 inactiveDurableTopicSubscribers.remove(inactiveName);
322 managementContext.unregisterMBean(inactiveName);
323 }
324 } catch (Exception e) {
325 LOG.error("Failed to unregister subscription " + sub, e);
326 }
327 }
328 }
329
330 protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
331 if (dest.isQueue()) {
332 if (dest.isTemporary()) {
333 temporaryQueues.put(key, view);
334 } else {
335 queues.put(key, view);
336 }
337 } else {
338 if (dest.isTemporary()) {
339 temporaryTopics.put(key, view);
340 } else {
341 topics.put(key, view);
342 }
343 }
344 try {
345 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
346 registeredMBeans.add(key);
347 } catch (Throwable e) {
348 LOG.warn("Failed to register MBean: " + key);
349 LOG.debug("Failure reason: " + e, e);
350 }
351 }
352
353 protected void unregisterDestination(ObjectName key) throws Exception {
354
355 DestinationView view = removeAndRemember(topics, key, null);
356 view = removeAndRemember(queues, key, view);
357 view = removeAndRemember(temporaryQueues, key, view);
358 view = removeAndRemember(temporaryTopics, key, view);
359 if (registeredMBeans.remove(key)) {
360 try {
361 managementContext.unregisterMBean(key);
362 } catch (Throwable e) {
363 LOG.warn("Failed to unregister MBean: " + key);
364 LOG.debug("Failure reason: " + e, e);
365 }
366 }
367 if (view != null) {
368 key = view.getSlowConsumerStrategy();
369 if (key!= null && registeredMBeans.remove(key)) {
370 try {
371 managementContext.unregisterMBean(key);
372 } catch (Throwable e) {
373 LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
374 LOG.debug("Failure reason: " + e, e);
375 }
376 }
377 }
378 }
379
380 protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception {
381
382 if (dest != null) {
383 if (dest.isQueue()) {
384 if (dest.isTemporary()) {
385 temporaryQueueProducers.put(key, view);
386 } else {
387 queueProducers.put(key, view);
388 }
389 } else {
390 if (dest.isTemporary()) {
391 temporaryTopicProducers.put(key, view);
392 } else {
393 topicProducers.put(key, view);
394 }
395 }
396 } else {
397 dynamicDestinationProducers.put(key, view);
398 }
399
400 try {
401 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
402 registeredMBeans.add(key);
403 } catch (Throwable e) {
404 LOG.warn("Failed to register MBean: " + key);
405 LOG.debug("Failure reason: " + e, e);
406 }
407 }
408
409 protected void unregisterProducer(ObjectName key) throws Exception {
410 queueProducers.remove(key);
411 topicProducers.remove(key);
412 temporaryQueueProducers.remove(key);
413 temporaryTopicProducers.remove(key);
414 dynamicDestinationProducers.remove(key);
415 if (registeredMBeans.remove(key)) {
416 try {
417 managementContext.unregisterMBean(key);
418 } catch (Throwable e) {
419 LOG.warn("Failed to unregister MBean: " + key);
420 LOG.debug("Failure reason: " + e, e);
421 }
422 }
423 }
424
425 private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
426 DestinationView candidate = map.remove(key);
427 if (candidate != null && view == null) {
428 view = candidate;
429 }
430 return candidate != null ? candidate : view;
431 }
432
433 protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
434 ActiveMQDestination dest = info.getDestination();
435 if (dest.isQueue()) {
436 if (dest.isTemporary()) {
437 temporaryQueueSubscribers.put(key, view);
438 } else {
439 queueSubscribers.put(key, view);
440 }
441 } else {
442 if (dest.isTemporary()) {
443 temporaryTopicSubscribers.put(key, view);
444 } else {
445 if (info.isDurable()) {
446 durableTopicSubscribers.put(key, view);
447 // unregister any inactive durable subs
448 try {
449 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
450 if (inactiveName != null) {
451 inactiveDurableTopicSubscribers.remove(inactiveName);
452 registeredMBeans.remove(inactiveName);
453 managementContext.unregisterMBean(inactiveName);
454 }
455 } catch (Throwable e) {
456 LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
457 }
458 } else {
459 topicSubscribers.put(key, view);
460 }
461 }
462 }
463
464 try {
465 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
466 registeredMBeans.add(key);
467 } catch (Throwable e) {
468 LOG.warn("Failed to register MBean: " + key);
469 LOG.debug("Failure reason: " + e, e);
470 }
471
472 }
473
474 protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
475 queueSubscribers.remove(key);
476 topicSubscribers.remove(key);
477 temporaryQueueSubscribers.remove(key);
478 temporaryTopicSubscribers.remove(key);
479 if (registeredMBeans.remove(key)) {
480 try {
481 managementContext.unregisterMBean(key);
482 } catch (Throwable e) {
483 LOG.warn("Failed to unregister MBean: " + key);
484 LOG.debug("Failure reason: " + e, e);
485 }
486 }
487 DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
488 if (view != null) {
489 // need to put this back in the inactive list
490 SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
491 if (addToInactive) {
492 SubscriptionInfo info = new SubscriptionInfo();
493 info.setClientId(subscriptionKey.getClientId());
494 info.setSubscriptionName(subscriptionKey.getSubscriptionName());
495 info.setDestination(new ActiveMQTopic(view.getDestinationName()));
496 info.setSelector(view.getSelector());
497 addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
498 }
499 }
500 }
501
502 protected void buildExistingSubscriptions() throws Exception {
503 Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
504 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
505 if (destinations != null) {
506 for (ActiveMQDestination dest : destinations) {
507 if (dest.isTopic()) {
508 SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
509 if (infos != null) {
510 for (int i = 0; i < infos.length; i++) {
511 SubscriptionInfo info = infos[i];
512 SubscriptionKey key = new SubscriptionKey(info);
513 if (!alreadyKnown(key)) {
514 LOG.debug("Restoring durable subscription mbean: " + info);
515 subscriptions.put(key, info);
516 }
517 }
518 }
519 }
520 }
521 }
522
523 for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) {
524 addInactiveSubscription(entry.getKey(), entry.getValue(), null);
525 }
526 }
527
528 private boolean alreadyKnown(SubscriptionKey key) {
529 boolean known = false;
530 known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
531 if (LOG.isTraceEnabled()) {
532 LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") + " already registered");
533 }
534 return known;
535 }
536
537 protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
538 try {
539 ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
540 ObjectName objectName = new ObjectName(getSubscriptionObjectName(offlineConsumerInfo, info.getClientId(), brokerObjectName));
541 SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription);
542
543 try {
544 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
545 registeredMBeans.add(objectName);
546 } catch (Throwable e) {
547 LOG.warn("Failed to register MBean: " + key);
548 LOG.debug("Failure reason: " + e, e);
549 }
550
551 inactiveDurableTopicSubscribers.put(objectName, view);
552 subscriptionKeys.put(key, objectName);
553 } catch (Exception e) {
554 LOG.error("Failed to register subscription " + info, e);
555 }
556 }
557
558 public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
559 List<Message> messages = getSubscriberMessages(view);
560 CompositeData c[] = new CompositeData[messages.size()];
561 for (int i = 0; i < c.length; i++) {
562 try {
563 c[i] = OpenTypeSupport.convert(messages.get(i));
564 } catch (Throwable e) {
565 LOG.error("failed to browse : " + view, e);
566 }
567 }
568 return c;
569 }
570
571 public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
572 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
573 List<Message> messages = getSubscriberMessages(view);
574 CompositeType ct = factory.getCompositeType();
575 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
576 TabularDataSupport rc = new TabularDataSupport(tt);
577 for (int i = 0; i < messages.size(); i++) {
578 rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
579 }
580 return rc;
581 }
582
583 protected List<Message> getSubscriberMessages(SubscriptionView view) {
584 // TODO It is very dangerous operation for big backlogs
585 if (!(destinationFactory instanceof DestinationFactoryImpl)) {
586 throw new RuntimeException("unsupported by " + destinationFactory);
587 }
588 PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
589 final List<Message> result = new ArrayList<Message>();
590 try {
591 ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
592 TopicMessageStore store = adapter.createTopicMessageStore(topic);
593 store.recover(new MessageRecoveryListener() {
594 public boolean recoverMessage(Message message) throws Exception {
595 result.add(message);
596 return true;
597 }
598
599 public boolean recoverMessageReference(MessageId messageReference) throws Exception {
600 throw new RuntimeException("Should not be called.");
601 }
602
603 public boolean hasSpace() {
604 return true;
605 }
606
607 public boolean isDuplicate(MessageId id) {
608 return false;
609 }
610 });
611 } catch (Throwable e) {
612 LOG.error("Failed to browse messages for Subscription " + view, e);
613 }
614 return result;
615
616 }
617
618 protected ObjectName[] getTopics() {
619 Set<ObjectName> set = topics.keySet();
620 return set.toArray(new ObjectName[set.size()]);
621 }
622
623 protected ObjectName[] getQueues() {
624 Set<ObjectName> set = queues.keySet();
625 return set.toArray(new ObjectName[set.size()]);
626 }
627
628 protected ObjectName[] getTemporaryTopics() {
629 Set<ObjectName> set = temporaryTopics.keySet();
630 return set.toArray(new ObjectName[set.size()]);
631 }
632
633 protected ObjectName[] getTemporaryQueues() {
634 Set<ObjectName> set = temporaryQueues.keySet();
635 return set.toArray(new ObjectName[set.size()]);
636 }
637
638 protected ObjectName[] getTopicSubscribers() {
639 Set<ObjectName> set = topicSubscribers.keySet();
640 return set.toArray(new ObjectName[set.size()]);
641 }
642
643 protected ObjectName[] getDurableTopicSubscribers() {
644 Set<ObjectName> set = durableTopicSubscribers.keySet();
645 return set.toArray(new ObjectName[set.size()]);
646 }
647
648 protected ObjectName[] getQueueSubscribers() {
649 Set<ObjectName> set = queueSubscribers.keySet();
650 return set.toArray(new ObjectName[set.size()]);
651 }
652
653 protected ObjectName[] getTemporaryTopicSubscribers() {
654 Set<ObjectName> set = temporaryTopicSubscribers.keySet();
655 return set.toArray(new ObjectName[set.size()]);
656 }
657
658 protected ObjectName[] getTemporaryQueueSubscribers() {
659 Set<ObjectName> set = temporaryQueueSubscribers.keySet();
660 return set.toArray(new ObjectName[set.size()]);
661 }
662
663 protected ObjectName[] getInactiveDurableTopicSubscribers() {
664 Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
665 return set.toArray(new ObjectName[set.size()]);
666 }
667
668 protected ObjectName[] getTopicProducers() {
669 Set<ObjectName> set = topicProducers.keySet();
670 return set.toArray(new ObjectName[set.size()]);
671 }
672
673 protected ObjectName[] getQueueProducers() {
674 Set<ObjectName> set = queueProducers.keySet();
675 return set.toArray(new ObjectName[set.size()]);
676 }
677
678 protected ObjectName[] getTemporaryTopicProducers() {
679 Set<ObjectName> set = temporaryTopicProducers.keySet();
680 return set.toArray(new ObjectName[set.size()]);
681 }
682
683 protected ObjectName[] getTemporaryQueueProducers() {
684 Set<ObjectName> set = temporaryQueueProducers.keySet();
685 return set.toArray(new ObjectName[set.size()]);
686 }
687
688 protected ObjectName[] getDynamicDestinationProducers() {
689 Set<ObjectName> set = dynamicDestinationProducers.keySet();
690 return set.toArray(new ObjectName[set.size()]);
691 }
692
693 public Broker getContextBroker() {
694 return contextBroker;
695 }
696
697 public void setContextBroker(Broker contextBroker) {
698 this.contextBroker = contextBroker;
699 }
700
701 protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException {
702 // Build the object name for the destination
703 Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
704 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type="
705 + JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()) + "," + "Destination="
706 + JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
707 return objectName;
708 }
709
710 protected ObjectName createObjectName(ProducerInfo producerInfo, String connectionClientId) throws MalformedObjectNameException {
711 // Build the object name for the producer info
712 Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
713
714 String destinationType = "destinationType=";
715 String destinationName = "destinationName=";
716
717 if (producerInfo.getDestination() == null) {
718 destinationType += "Dynamic";
719 destinationName = null;
720 } else {
721 destinationType += producerInfo.getDestination().getDestinationTypeAsString();
722 destinationName += JMXSupport.encodeObjectNamePart(producerInfo.getDestination().getPhysicalName());
723 }
724
725 String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
726 String producerId = "producerId=" + JMXSupport.encodeObjectNamePart(producerInfo.getProducerId().toString());
727
728 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
729 + "Type=Producer" + ","
730 + destinationType + ","
731 + (destinationName != null ? destinationName + "," : "")
732 + clientId + "," + producerId);
733 return objectName;
734 }
735
736 public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
737 ObjectName objectName = null;
738 try {
739 objectName = createObjectName(strategy);
740 if (!registeredMBeans.contains(objectName)) {
741 AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
742 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
743 registeredMBeans.add(objectName);
744 }
745 } catch (Exception e) {
746 LOG.warn("Failed to register MBean: " + strategy);
747 LOG.debug("Failure reason: " + e, e);
748 }
749 return objectName;
750 }
751
752 protected ObjectName createObjectName(XATransaction transaction) throws MalformedObjectNameException {
753 Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
754 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName")
755 + "," + "Type=RecoveredXaTransaction"
756 + "," + "Xid="
757 + JMXSupport.encodeObjectNamePart(transaction.getTransactionId().toString()));
758 return objectName;
759 }
760
761 public void registerRecoveredTransactionMBean(XATransaction transaction) {
762 try {
763 ObjectName objectName = createObjectName(transaction);
764 if (!registeredMBeans.contains(objectName)) {
765 RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
766 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
767 registeredMBeans.add(objectName);
768 }
769 } catch (Exception e) {
770 LOG.warn("Failed to register prepared transaction MBean: " + transaction);
771 LOG.debug("Failure reason: " + e, e);
772 }
773 }
774
775 public void unregister(XATransaction transaction) {
776 try {
777 ObjectName objectName = createObjectName(transaction);
778 if (registeredMBeans.remove(objectName)) {
779 try {
780 managementContext.unregisterMBean(objectName);
781 } catch (Throwable e) {
782 LOG.warn("Failed to unregister MBean: " + objectName);
783 LOG.debug("Failure reason: " + e, e);
784 }
785 }
786 } catch (Exception e) {
787 LOG.warn("Failed to create object name to unregister " + transaction, e);
788 }
789 }
790
791 private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
792 Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
793 ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
794 + "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName()));
795 return objectName;
796 }
797
798 public ObjectName getSubscriberObjectName(Subscription key) {
799 return subscriptionMap.get(key);
800 }
801
802 public Subscription getSubscriber(ObjectName key) {
803 Subscription sub = null;
804 for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
805 if (entry.getValue().equals(key)) {
806 sub = entry.getKey();
807 break;
808 }
809 }
810 return sub;
811 }
812 }