001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020import java.util.HashMap;
021import java.util.Iterator;
022import java.util.Map;
023import java.util.Map.Entry;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.CopyOnWriteArraySet;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.ThreadPoolExecutor;
029
030import javax.jms.IllegalStateException;
031import javax.management.InstanceNotFoundException;
032import javax.management.MalformedObjectNameException;
033import javax.management.ObjectName;
034import javax.management.openmbean.CompositeData;
035import javax.management.openmbean.CompositeDataSupport;
036import javax.management.openmbean.CompositeType;
037import javax.management.openmbean.OpenDataException;
038import javax.management.openmbean.TabularData;
039import javax.management.openmbean.TabularDataSupport;
040import javax.management.openmbean.TabularType;
041
042import org.apache.activemq.broker.Broker;
043import org.apache.activemq.broker.BrokerService;
044import org.apache.activemq.broker.ConnectionContext;
045import org.apache.activemq.broker.ProducerBrokerExchange;
046import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
047import org.apache.activemq.broker.region.Destination;
048import org.apache.activemq.broker.region.DestinationFactory;
049import org.apache.activemq.broker.region.DestinationInterceptor;
050import org.apache.activemq.broker.region.DurableTopicSubscription;
051import org.apache.activemq.broker.region.MessageReference;
052import org.apache.activemq.broker.region.NullMessageReference;
053import org.apache.activemq.broker.region.Queue;
054import org.apache.activemq.broker.region.Region;
055import org.apache.activemq.broker.region.RegionBroker;
056import org.apache.activemq.broker.region.Subscription;
057import org.apache.activemq.broker.region.Topic;
058import org.apache.activemq.broker.region.TopicRegion;
059import org.apache.activemq.broker.region.TopicSubscription;
060import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
061import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
062import org.apache.activemq.command.ActiveMQDestination;
063import org.apache.activemq.command.ActiveMQMessage;
064import org.apache.activemq.command.ActiveMQTopic;
065import org.apache.activemq.command.ConnectionInfo;
066import org.apache.activemq.command.ConsumerInfo;
067import org.apache.activemq.command.Message;
068import org.apache.activemq.command.MessageAck;
069import org.apache.activemq.command.MessageId;
070import org.apache.activemq.command.ProducerInfo;
071import org.apache.activemq.command.SubscriptionInfo;
072import org.apache.activemq.thread.Scheduler;
073import org.apache.activemq.thread.TaskRunnerFactory;
074import org.apache.activemq.transaction.XATransaction;
075import org.apache.activemq.usage.SystemUsage;
076import org.apache.activemq.util.ServiceStopper;
077import org.apache.activemq.util.SubscriptionKey;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081public class ManagedRegionBroker extends RegionBroker {
082    private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
083    private final ManagementContext managementContext;
084    private final ObjectName brokerObjectName;
085    private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
086    private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
087    private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
088    private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
089    private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
090    private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091    private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
092    private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
093    private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
094    private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
095    private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
096    private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
097    private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
098    private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
099    private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
100    private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
101    private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
102    private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
103    /* This is the first broker in the broker interceptor chain. */
104    private Broker contextBroker;
105
106    private final ExecutorService asyncInvokeService;
107    private final long mbeanTimeout;
108
109    public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
110                               DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
111        super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
112        this.managementContext = context;
113        this.brokerObjectName = brokerObjectName;
114        this.mbeanTimeout = brokerService.getMbeanInvocationTimeout();
115        this.asyncInvokeService = mbeanTimeout > 0 ? executor : null;;
116    }
117
118    @Override
119    public void start() throws Exception {
120        super.start();
121        // build all existing durable subscriptions
122        buildExistingSubscriptions();
123    }
124
125    @Override
126    protected void doStop(ServiceStopper stopper) {
127        super.doStop(stopper);
128        // lets remove any mbeans not yet removed
129        for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
130            ObjectName name = iter.next();
131            try {
132                managementContext.unregisterMBean(name);
133            } catch (InstanceNotFoundException e) {
134                LOG.warn("The MBean {} is no longer registered with JMX", name);
135            } catch (Exception e) {
136                stopper.onException(this, e);
137            }
138        }
139        registeredMBeans.clear();
140    }
141
142    @Override
143    protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
144        return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
145    }
146
147    @Override
148    protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
149        return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
150    }
151
152    @Override
153    protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
154        return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
155    }
156
157    @Override
158    protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
159        return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
160    }
161
162    public void register(ActiveMQDestination destName, Destination destination) {
163        // TODO refactor to allow views for custom destinations
164        try {
165            ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName);
166            DestinationView view;
167            if (destination instanceof Queue) {
168                view = new QueueView(this, (Queue)destination);
169            } else if (destination instanceof Topic) {
170                view = new TopicView(this, (Topic)destination);
171            } else {
172                view = null;
173                LOG.warn("JMX View is not supported for custom destination {}", destination);
174            }
175            if (view != null) {
176                registerDestination(objectName, destName, view);
177            }
178        } catch (Exception e) {
179            LOG.error("Failed to register destination {}", destName, e);
180        }
181    }
182
183    public void unregister(ActiveMQDestination destName) {
184        try {
185            ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName);
186            unregisterDestination(objectName);
187        } catch (Exception e) {
188            LOG.error("Failed to unregister {}", destName, e);
189        }
190    }
191
192    public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
193        String connectionClientId = context.getClientId();
194
195        SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
196        try {
197            ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, sub.getConsumerInfo());
198            SubscriptionView view;
199            if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
200                // add offline subscribers to inactive list
201                SubscriptionInfo info = new SubscriptionInfo();
202                info.setClientId(context.getClientId());
203                info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
204                info.setDestination(sub.getConsumerInfo().getDestination());
205                info.setSelector(sub.getSelector());
206                addInactiveSubscription(key, info, sub);
207            } else {
208                String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
209                if (sub.getConsumerInfo().isDurable()) {
210                    view = new DurableSubscriptionView(this, brokerService, context.getClientId(), userName, sub);
211                } else {
212                    if (sub instanceof TopicSubscription) {
213                        view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
214                    } else {
215                        view = new SubscriptionView(context.getClientId(), userName, sub);
216                    }
217                }
218                registerSubscription(objectName, sub.getConsumerInfo(), key, view);
219            }
220            subscriptionMap.put(sub, objectName);
221            return objectName;
222        } catch (Exception e) {
223            LOG.error("Failed to register subscription {}", sub, e);
224            return null;
225        }
226    }
227
228    @Override
229    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
230        super.addConnection(context, info);
231        this.contextBroker.getBrokerService().incrementCurrentConnections();
232        this.contextBroker.getBrokerService().incrementTotalConnections();
233    }
234
235    @Override
236    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
237        super.removeConnection(context, info, error);
238        this.contextBroker.getBrokerService().decrementCurrentConnections();
239    }
240
241    @Override
242    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
243        Subscription sub = super.addConsumer(context, info);
244        SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
245        ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
246        if (inactiveName != null) {
247            // if it was inactive, register it
248            registerSubscription(context, sub);
249        }
250        return sub;
251    }
252
253    @Override
254    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
255        for (Subscription sub : subscriptionMap.keySet()) {
256            if (sub.getConsumerInfo().equals(info)) {
257               // unregister all consumer subs
258               unregisterSubscription(subscriptionMap.get(sub), true);
259            }
260        }
261        super.removeConsumer(context, info);
262    }
263
264    @Override
265    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
266        super.addProducer(context, info);
267        String connectionClientId = context.getClientId();
268        ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info);
269        String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
270        ProducerView view = new ProducerView(info, connectionClientId, userName, this);
271        registerProducer(objectName, info.getDestination(), view);
272    }
273
274    @Override
275    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
276        ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info);
277        unregisterProducer(objectName);
278        super.removeProducer(context, info);
279    }
280
281    @Override
282    public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
283        if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
284            ProducerInfo info = exchange.getProducerState().getInfo();
285            if (info.getDestination() == null && info.getProducerId() != null) {
286                ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, exchange.getConnectionContext().getClientId(), info);
287                ProducerView view = this.dynamicDestinationProducers.get(objectName);
288                if (view != null) {
289                    ActiveMQDestination dest = message.getDestination();
290                    if (dest != null) {
291                        view.setLastUsedDestinationName(dest);
292                    }
293                }
294            }
295         }
296        super.send(exchange, message);
297    }
298
299    public void unregisterSubscription(Subscription sub) {
300        ObjectName name = subscriptionMap.remove(sub);
301        if (name != null) {
302            try {
303                SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
304                ObjectName inactiveName = subscriptionKeys.remove(subscriptionKey);
305                if (inactiveName != null) {
306                    inactiveDurableTopicSubscribers.remove(inactiveName);
307                    managementContext.unregisterMBean(inactiveName);
308                }
309            } catch (Exception e) {
310                LOG.error("Failed to unregister subscription {}", sub, e);
311            }
312        }
313    }
314
315    protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
316        if (dest.isQueue()) {
317            if (dest.isTemporary()) {
318                temporaryQueues.put(key, view);
319            } else {
320                queues.put(key, view);
321            }
322        } else {
323            if (dest.isTemporary()) {
324                temporaryTopics.put(key, view);
325            } else {
326                topics.put(key, view);
327            }
328        }
329        try {
330            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
331            registeredMBeans.add(key);
332        } catch (Throwable e) {
333            LOG.warn("Failed to register MBean {}", key);
334            LOG.debug("Failure reason: ", e);
335        }
336    }
337
338    protected void unregisterDestination(ObjectName key) throws Exception {
339
340        DestinationView view = removeAndRemember(topics, key, null);
341        view = removeAndRemember(queues, key, view);
342        view = removeAndRemember(temporaryQueues, key, view);
343        view = removeAndRemember(temporaryTopics, key, view);
344        if (registeredMBeans.remove(key)) {
345            try {
346                managementContext.unregisterMBean(key);
347            } catch (Throwable e) {
348                LOG.warn("Failed to unregister MBean {}", key);
349                LOG.debug("Failure reason: ", e);
350            }
351        }
352        if (view != null) {
353            key = view.getSlowConsumerStrategy();
354            if (key!= null && registeredMBeans.remove(key)) {
355                try {
356                    managementContext.unregisterMBean(key);
357                } catch (Throwable e) {
358                    LOG.warn("Failed to unregister slow consumer strategy MBean {}", key);
359                    LOG.debug("Failure reason: ", e);
360                }
361            }
362        }
363    }
364
365    protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception {
366
367        if (dest != null) {
368            if (dest.isQueue()) {
369                if (dest.isTemporary()) {
370                    temporaryQueueProducers.put(key, view);
371                } else {
372                    queueProducers.put(key, view);
373                }
374            } else {
375                if (dest.isTemporary()) {
376                    temporaryTopicProducers.put(key, view);
377                } else {
378                    topicProducers.put(key, view);
379                }
380            }
381        } else {
382            dynamicDestinationProducers.put(key, view);
383        }
384
385        try {
386            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
387            registeredMBeans.add(key);
388        } catch (Throwable e) {
389            LOG.warn("Failed to register MBean {}", key);
390            LOG.debug("Failure reason: ", e);
391        }
392    }
393
394    protected void unregisterProducer(ObjectName key) throws Exception {
395        queueProducers.remove(key);
396        topicProducers.remove(key);
397        temporaryQueueProducers.remove(key);
398        temporaryTopicProducers.remove(key);
399        dynamicDestinationProducers.remove(key);
400        if (registeredMBeans.remove(key)) {
401            try {
402                managementContext.unregisterMBean(key);
403            } catch (Throwable e) {
404                LOG.warn("Failed to unregister MBean {}", key);
405                LOG.debug("Failure reason: ", e);
406            }
407        }
408    }
409
410    private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
411        DestinationView candidate = map.remove(key);
412        if (candidate != null && view == null) {
413            view = candidate;
414        }
415        return candidate != null ? candidate : view;
416    }
417
418    protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
419        ActiveMQDestination dest = info.getDestination();
420        if (dest.isQueue()) {
421            if (dest.isTemporary()) {
422                temporaryQueueSubscribers.put(key, view);
423            } else {
424                queueSubscribers.put(key, view);
425            }
426        } else {
427            if (dest.isTemporary()) {
428                temporaryTopicSubscribers.put(key, view);
429            } else {
430                if (info.isDurable()) {
431                    durableTopicSubscribers.put(key, view);
432                    // unregister any inactive durable subs
433                    try {
434                        ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
435                        if (inactiveName != null) {
436                            inactiveDurableTopicSubscribers.remove(inactiveName);
437                            registeredMBeans.remove(inactiveName);
438                            managementContext.unregisterMBean(inactiveName);
439                        }
440                    } catch (Throwable e) {
441                        LOG.error("Unable to unregister inactive durable subscriber {}", subscriptionKey, e);
442                    }
443                } else {
444                    topicSubscribers.put(key, view);
445                }
446            }
447        }
448
449        try {
450            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
451            registeredMBeans.add(key);
452        } catch (Throwable e) {
453            LOG.warn("Failed to register MBean {}", key);
454            LOG.debug("Failure reason: ", e);
455        }
456    }
457
458    protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
459        queueSubscribers.remove(key);
460        topicSubscribers.remove(key);
461        temporaryQueueSubscribers.remove(key);
462        temporaryTopicSubscribers.remove(key);
463        if (registeredMBeans.remove(key)) {
464            try {
465                managementContext.unregisterMBean(key);
466            } catch (Throwable e) {
467                LOG.warn("Failed to unregister MBean {}", key);
468                LOG.debug("Failure reason: ", e);
469            }
470        }
471        DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
472        if (view != null) {
473            // need to put this back in the inactive list
474            SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
475            if (addToInactive) {
476                SubscriptionInfo info = new SubscriptionInfo();
477                info.setClientId(subscriptionKey.getClientId());
478                info.setSubscriptionName(subscriptionKey.getSubscriptionName());
479                info.setDestination(new ActiveMQTopic(view.getDestinationName()));
480                info.setSelector(view.getSelector());
481                addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
482            }
483        }
484    }
485
486    protected void buildExistingSubscriptions() throws Exception {
487        Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
488        Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
489        if (destinations != null) {
490            for (ActiveMQDestination dest : destinations) {
491                if (dest.isTopic()) {
492                    SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
493                    if (infos != null) {
494                        for (int i = 0; i < infos.length; i++) {
495                            SubscriptionInfo info = infos[i];
496                            SubscriptionKey key = new SubscriptionKey(info);
497                            if (!alreadyKnown(key)) {
498                                LOG.debug("Restoring durable subscription MBean {}", info);
499                                subscriptions.put(key, info);
500                            }
501                        }
502                    }
503                }
504            }
505        }
506
507        for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) {
508            addInactiveSubscription(entry.getKey(), entry.getValue(), null);
509        }
510    }
511
512    private boolean alreadyKnown(SubscriptionKey key) {
513        boolean known = false;
514        known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
515        LOG.trace("Sub with key: {}, {} already registered", key, (known ? "": "not"));
516        return known;
517    }
518
519    protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
520        try {
521            ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
522            ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo);
523            SubscriptionView view = new InactiveDurableSubscriptionView(this, brokerService, key.getClientId(), info, subscription);
524
525            try {
526                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
527                registeredMBeans.add(objectName);
528            } catch (Throwable e) {
529                LOG.warn("Failed to register MBean {}", key);
530                LOG.debug("Failure reason: ", e);
531            }
532
533            inactiveDurableTopicSubscribers.put(objectName, view);
534            subscriptionKeys.put(key, objectName);
535        } catch (Exception e) {
536            LOG.error("Failed to register subscription {}", info, e);
537        }
538    }
539
540    public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
541        Message[] messages = getSubscriberMessages(view);
542        CompositeData c[] = new CompositeData[messages.length];
543        for (int i = 0; i < c.length; i++) {
544            try {
545                c[i] = OpenTypeSupport.convert(messages[i]);
546            } catch (Throwable e) {
547                LOG.error("Failed to browse: {}", view, e);
548            }
549        }
550        return c;
551    }
552
553    public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
554        OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
555        Message[] messages = getSubscriberMessages(view);
556        CompositeType ct = factory.getCompositeType();
557        TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
558        TabularDataSupport rc = new TabularDataSupport(tt);
559        for (int i = 0; i < messages.length; i++) {
560            rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
561        }
562        return rc;
563    }
564
565    public void remove(SubscriptionView view, String messageId)  throws Exception {
566        ActiveMQDestination destination = getTopicDestination(view);
567        if (destination != null) {
568            final Destination topic = getTopicRegion().getDestinationMap().get(destination);
569            final MessageAck messageAck = new MessageAck();
570            messageAck.setMessageID(new MessageId(messageId));
571            messageAck.setDestination(destination);
572
573            topic.getMessageStore().removeMessage(brokerService.getAdminConnectionContext(), messageAck);
574
575            // if sub is active, remove from cursor
576            if (view.subscription instanceof DurableTopicSubscription) {
577                final DurableTopicSubscription durableTopicSubscription = (DurableTopicSubscription) view.subscription;
578                final MessageReference messageReference = new NullMessageReference();
579                messageReference.getMessage().setMessageId(messageAck.getFirstMessageId());
580                durableTopicSubscription.getPending().remove(messageReference);
581            }
582
583        } else {
584            throw new IllegalStateException("can't determine topic for sub:" + view);
585        }
586    }
587
588    protected Message[] getSubscriberMessages(SubscriptionView view) {
589        ActiveMQDestination destination = getTopicDestination(view);
590        if (destination != null) {
591            Destination topic = getTopicRegion().getDestinationMap().get(destination);
592            return topic.browse();
593
594        } else {
595            LOG.warn("can't determine topic to browse for sub:" + view);
596            return new Message[]{};
597        }
598    }
599
600    private ActiveMQDestination getTopicDestination(SubscriptionView view) {
601        ActiveMQDestination destination = null;
602        if (view.subscription instanceof DurableTopicSubscription) {
603            destination = new ActiveMQTopic(view.getDestinationName());
604        } else if (view instanceof InactiveDurableSubscriptionView) {
605            destination = ((InactiveDurableSubscriptionView)view).subscriptionInfo.getDestination();
606        }
607        return destination;
608    }
609
610    protected ObjectName[] getTopics() {
611        Set<ObjectName> set = topics.keySet();
612        return set.toArray(new ObjectName[set.size()]);
613    }
614
615    protected ObjectName[] getQueues() {
616        Set<ObjectName> set = queues.keySet();
617        return set.toArray(new ObjectName[set.size()]);
618    }
619
620    protected ObjectName[] getTemporaryTopics() {
621        Set<ObjectName> set = temporaryTopics.keySet();
622        return set.toArray(new ObjectName[set.size()]);
623    }
624
625    protected ObjectName[] getTemporaryQueues() {
626        Set<ObjectName> set = temporaryQueues.keySet();
627        return set.toArray(new ObjectName[set.size()]);
628    }
629
630    protected ObjectName[] getTopicSubscribers() {
631        Set<ObjectName> set = topicSubscribers.keySet();
632        return set.toArray(new ObjectName[set.size()]);
633    }
634
635    protected ObjectName[] getDurableTopicSubscribers() {
636        Set<ObjectName> set = durableTopicSubscribers.keySet();
637        return set.toArray(new ObjectName[set.size()]);
638    }
639
640    protected ObjectName[] getQueueSubscribers() {
641        Set<ObjectName> set = queueSubscribers.keySet();
642        return set.toArray(new ObjectName[set.size()]);
643    }
644
645    protected ObjectName[] getTemporaryTopicSubscribers() {
646        Set<ObjectName> set = temporaryTopicSubscribers.keySet();
647        return set.toArray(new ObjectName[set.size()]);
648    }
649
650    protected ObjectName[] getTemporaryQueueSubscribers() {
651        Set<ObjectName> set = temporaryQueueSubscribers.keySet();
652        return set.toArray(new ObjectName[set.size()]);
653    }
654
655    protected ObjectName[] getInactiveDurableTopicSubscribers() {
656        Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
657        return set.toArray(new ObjectName[set.size()]);
658    }
659
660    protected ObjectName[] getTopicProducers() {
661        Set<ObjectName> set = topicProducers.keySet();
662        return set.toArray(new ObjectName[set.size()]);
663    }
664
665    protected ObjectName[] getQueueProducers() {
666        Set<ObjectName> set = queueProducers.keySet();
667        return set.toArray(new ObjectName[set.size()]);
668    }
669
670    protected ObjectName[] getTemporaryTopicProducers() {
671        Set<ObjectName> set = temporaryTopicProducers.keySet();
672        return set.toArray(new ObjectName[set.size()]);
673    }
674
675    protected ObjectName[] getTemporaryQueueProducers() {
676        Set<ObjectName> set = temporaryQueueProducers.keySet();
677        return set.toArray(new ObjectName[set.size()]);
678    }
679
680    protected ObjectName[] getDynamicDestinationProducers() {
681        Set<ObjectName> set = dynamicDestinationProducers.keySet();
682        return set.toArray(new ObjectName[set.size()]);
683    }
684
685    public Broker getContextBroker() {
686        return contextBroker;
687    }
688
689    public void setContextBroker(Broker contextBroker) {
690        this.contextBroker = contextBroker;
691    }
692
693    public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
694        ObjectName objectName = null;
695        try {
696            objectName = BrokerMBeanSupport.createAbortSlowConsumerStrategyName(brokerObjectName, strategy);
697            if (!registeredMBeans.contains(objectName))  {
698
699                AbortSlowConsumerStrategyView view = null;
700                if (strategy instanceof AbortSlowAckConsumerStrategy) {
701                    view = new AbortSlowAckConsumerStrategyView(this, (AbortSlowAckConsumerStrategy) strategy);
702                } else {
703                    view = new AbortSlowConsumerStrategyView(this, strategy);
704                }
705
706                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
707                registeredMBeans.add(objectName);
708            }
709        } catch (Exception e) {
710            LOG.warn("Failed to register MBean {}", strategy);
711            LOG.debug("Failure reason: ", e);
712        }
713        return objectName;
714    }
715
716    public void registerRecoveredTransactionMBean(XATransaction transaction) {
717        try {
718            ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
719            if (!registeredMBeans.contains(objectName))  {
720                RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
721                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
722                registeredMBeans.add(objectName);
723            }
724        } catch (Exception e) {
725            LOG.warn("Failed to register prepared transaction MBean {}", transaction);
726            LOG.debug("Failure reason: ", e);
727        }
728    }
729
730    public void unregister(XATransaction transaction) {
731        try {
732            ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
733            if (registeredMBeans.remove(objectName)) {
734                try {
735                    managementContext.unregisterMBean(objectName);
736                } catch (Throwable e) {
737                    LOG.warn("Failed to unregister MBean {}", objectName);
738                    LOG.debug("Failure reason: ", e);
739                }
740            }
741        } catch (Exception e) {
742            LOG.warn("Failed to create object name to unregister {}", transaction, e);
743        }
744    }
745
746    public ObjectName getSubscriberObjectName(Subscription key) {
747        return subscriptionMap.get(key);
748    }
749
750    public Subscription getSubscriber(ObjectName key) {
751        Subscription sub = null;
752        for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
753            if (entry.getValue().equals(key)) {
754                sub = entry.getKey();
755                break;
756            }
757        }
758        return sub;
759    }
760
761    public Map<ObjectName, DestinationView> getQueueViews() {
762        return queues;
763    }
764}