001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020import java.util.*;
021import java.util.Map.Entry;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.CopyOnWriteArraySet;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.ThreadPoolExecutor;
026
027import javax.jms.IllegalStateException;
028import javax.management.InstanceNotFoundException;
029import javax.management.MalformedObjectNameException;
030import javax.management.ObjectName;
031import javax.management.openmbean.CompositeData;
032import javax.management.openmbean.CompositeDataSupport;
033import javax.management.openmbean.CompositeType;
034import javax.management.openmbean.OpenDataException;
035import javax.management.openmbean.TabularData;
036import javax.management.openmbean.TabularDataSupport;
037import javax.management.openmbean.TabularType;
038
039import org.apache.activemq.broker.Broker;
040import org.apache.activemq.broker.BrokerService;
041import org.apache.activemq.broker.ConnectionContext;
042import org.apache.activemq.broker.ProducerBrokerExchange;
043import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
044import org.apache.activemq.broker.region.Destination;
045import org.apache.activemq.broker.region.DestinationFactory;
046import org.apache.activemq.broker.region.DestinationInterceptor;
047import org.apache.activemq.broker.region.DurableTopicSubscription;
048import org.apache.activemq.broker.region.MessageReference;
049import org.apache.activemq.broker.region.NullMessageReference;
050import org.apache.activemq.broker.region.Queue;
051import org.apache.activemq.broker.region.Region;
052import org.apache.activemq.broker.region.RegionBroker;
053import org.apache.activemq.broker.region.Subscription;
054import org.apache.activemq.broker.region.Topic;
055import org.apache.activemq.broker.region.TopicRegion;
056import org.apache.activemq.broker.region.TopicSubscription;
057import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
058import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
059import org.apache.activemq.command.ActiveMQDestination;
060import org.apache.activemq.command.ActiveMQMessage;
061import org.apache.activemq.command.ActiveMQTopic;
062import org.apache.activemq.command.ConnectionInfo;
063import org.apache.activemq.command.ConsumerInfo;
064import org.apache.activemq.command.Message;
065import org.apache.activemq.command.MessageAck;
066import org.apache.activemq.command.MessageId;
067import org.apache.activemq.command.ProducerInfo;
068import org.apache.activemq.command.SubscriptionInfo;
069import org.apache.activemq.thread.Scheduler;
070import org.apache.activemq.thread.TaskRunnerFactory;
071import org.apache.activemq.transaction.XATransaction;
072import org.apache.activemq.usage.SystemUsage;
073import org.apache.activemq.util.ServiceStopper;
074import org.apache.activemq.util.SubscriptionKey;
075import org.slf4j.Logger;
076import org.slf4j.LoggerFactory;
077
078public class ManagedRegionBroker extends RegionBroker {
079    private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
080    private final ManagementContext managementContext;
081    private final ObjectName brokerObjectName;
082    private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
083    private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
084    private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
085    private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
086    private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
087    private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
088    private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
089    private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
090    private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091    private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
092    private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
093    private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
094    private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
095    private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
096    private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
097    private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
098    private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
099    private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
100    /* This is the first broker in the broker interceptor chain. */
101    private Broker contextBroker;
102
103    private final ExecutorService asyncInvokeService;
104    private final long mbeanTimeout;
105
106    public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
107                               DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
108        super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
109        this.managementContext = context;
110        this.brokerObjectName = brokerObjectName;
111        this.mbeanTimeout = brokerService.getMbeanInvocationTimeout();
112        this.asyncInvokeService = mbeanTimeout > 0 ? executor : null;;
113    }
114
115    @Override
116    public void start() throws Exception {
117        super.start();
118        // build all existing durable subscriptions
119        buildExistingSubscriptions();
120    }
121
122    @Override
123    protected void doStop(ServiceStopper stopper) {
124        super.doStop(stopper);
125        // lets remove any mbeans not yet removed
126        for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
127            ObjectName name = iter.next();
128            try {
129                managementContext.unregisterMBean(name);
130            } catch (InstanceNotFoundException e) {
131                LOG.warn("The MBean {} is no longer registered with JMX", name);
132            } catch (Exception e) {
133                stopper.onException(this, e);
134            }
135        }
136        registeredMBeans.clear();
137    }
138
139    @Override
140    protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
141        return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
142    }
143
144    @Override
145    protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
146        return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
147    }
148
149    @Override
150    protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
151        return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
152    }
153
154    @Override
155    protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
156        return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
157    }
158
159    public void register(ActiveMQDestination destName, Destination destination) {
160        // TODO refactor to allow views for custom destinations
161        try {
162            ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName);
163            DestinationView view;
164            if (destination instanceof Queue) {
165                view = new QueueView(this, (Queue)destination);
166            } else if (destination instanceof Topic) {
167                view = new TopicView(this, (Topic)destination);
168            } else {
169                view = null;
170                LOG.warn("JMX View is not supported for custom destination {}", destination);
171            }
172            if (view != null) {
173                registerDestination(objectName, destName, view);
174            }
175        } catch (Exception e) {
176            LOG.error("Failed to register destination {}", destName, e);
177        }
178    }
179
180    public void unregister(ActiveMQDestination destName) {
181        try {
182            ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName);
183            unregisterDestination(objectName);
184        } catch (Exception e) {
185            LOG.error("Failed to unregister {}", destName, e);
186        }
187    }
188
189    public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
190        String connectionClientId = context.getClientId();
191
192        SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
193        try {
194            ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, sub.getConsumerInfo());
195            SubscriptionView view;
196            if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
197                // add offline subscribers to inactive list
198                SubscriptionInfo info = new SubscriptionInfo();
199                info.setClientId(context.getClientId());
200                info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
201                info.setDestination(sub.getConsumerInfo().getDestination());
202                info.setSelector(sub.getSelector());
203                addInactiveSubscription(key, info, sub);
204            } else {
205                String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
206                if (sub.getConsumerInfo().isDurable()) {
207                    view = new DurableSubscriptionView(this, brokerService, context.getClientId(), userName, sub);
208                } else {
209                    if (sub instanceof TopicSubscription) {
210                        view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
211                    } else {
212                        view = new SubscriptionView(context.getClientId(), userName, sub);
213                    }
214                }
215                registerSubscription(objectName, sub.getConsumerInfo(), key, view);
216            }
217            subscriptionMap.put(sub, objectName);
218            return objectName;
219        } catch (Exception e) {
220            LOG.error("Failed to register subscription {}", sub, e);
221            return null;
222        }
223    }
224
225    @Override
226    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
227        super.addConnection(context, info);
228        this.contextBroker.getBrokerService().incrementCurrentConnections();
229        this.contextBroker.getBrokerService().incrementTotalConnections();
230    }
231
232    @Override
233    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
234        super.removeConnection(context, info, error);
235        this.contextBroker.getBrokerService().decrementCurrentConnections();
236    }
237
238    @Override
239    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
240        Subscription sub = super.addConsumer(context, info);
241        SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
242        ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
243        if (inactiveName != null) {
244            // if it was inactive, register it
245            registerSubscription(context, sub);
246        }
247        return sub;
248    }
249
250    @Override
251    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
252        for (Subscription sub : subscriptionMap.keySet()) {
253            if (sub.getConsumerInfo().equals(info)) {
254               // unregister all consumer subs
255               unregisterSubscription(subscriptionMap.get(sub), true);
256            }
257        }
258        super.removeConsumer(context, info);
259    }
260
261    @Override
262    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
263        super.addProducer(context, info);
264        String connectionClientId = context.getClientId();
265        ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info);
266        String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
267        ProducerView view = new ProducerView(info, connectionClientId, userName, this);
268        registerProducer(objectName, info.getDestination(), view);
269    }
270
271    @Override
272    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
273        ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info);
274        unregisterProducer(objectName);
275        super.removeProducer(context, info);
276    }
277
278    @Override
279    public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
280        if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
281            ProducerInfo info = exchange.getProducerState().getInfo();
282            if (info.getDestination() == null && info.getProducerId() != null) {
283                ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, exchange.getConnectionContext().getClientId(), info);
284                ProducerView view = this.dynamicDestinationProducers.get(objectName);
285                if (view != null) {
286                    ActiveMQDestination dest = message.getDestination();
287                    if (dest != null) {
288                        view.setLastUsedDestinationName(dest);
289                    }
290                }
291            }
292         }
293        super.send(exchange, message);
294    }
295
296    public void unregisterSubscription(Subscription sub) {
297        ObjectName name = subscriptionMap.remove(sub);
298        if (name != null) {
299            try {
300                SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
301                ObjectName inactiveName = subscriptionKeys.remove(subscriptionKey);
302                if (inactiveName != null) {
303                    inactiveDurableTopicSubscribers.remove(inactiveName);
304                    managementContext.unregisterMBean(inactiveName);
305                }
306            } catch (Exception e) {
307                LOG.error("Failed to unregister subscription {}", sub, e);
308            }
309        }
310    }
311
312    protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
313        if (dest.isQueue()) {
314            if (dest.isTemporary()) {
315                temporaryQueues.put(key, view);
316            } else {
317                queues.put(key, view);
318            }
319        } else {
320            if (dest.isTemporary()) {
321                temporaryTopics.put(key, view);
322            } else {
323                topics.put(key, view);
324            }
325        }
326        try {
327            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
328            registeredMBeans.add(key);
329        } catch (Throwable e) {
330            LOG.warn("Failed to register MBean {}", key);
331            LOG.debug("Failure reason: ", e);
332        }
333    }
334
335    protected void unregisterDestination(ObjectName key) throws Exception {
336
337        DestinationView view = removeAndRemember(topics, key, null);
338        view = removeAndRemember(queues, key, view);
339        view = removeAndRemember(temporaryQueues, key, view);
340        view = removeAndRemember(temporaryTopics, key, view);
341        if (registeredMBeans.remove(key)) {
342            try {
343                managementContext.unregisterMBean(key);
344            } catch (Throwable e) {
345                LOG.warn("Failed to unregister MBean {}", key);
346                LOG.debug("Failure reason: ", e);
347            }
348        }
349        if (view != null) {
350            key = view.getSlowConsumerStrategy();
351            if (key!= null && registeredMBeans.remove(key)) {
352                try {
353                    managementContext.unregisterMBean(key);
354                } catch (Throwable e) {
355                    LOG.warn("Failed to unregister slow consumer strategy MBean {}", key);
356                    LOG.debug("Failure reason: ", e);
357                }
358            }
359        }
360    }
361
362    protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception {
363
364        if (dest != null) {
365            if (dest.isQueue()) {
366                if (dest.isTemporary()) {
367                    temporaryQueueProducers.put(key, view);
368                } else {
369                    queueProducers.put(key, view);
370                }
371            } else {
372                if (dest.isTemporary()) {
373                    temporaryTopicProducers.put(key, view);
374                } else {
375                    topicProducers.put(key, view);
376                }
377            }
378        } else {
379            dynamicDestinationProducers.put(key, view);
380        }
381
382        try {
383            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
384            registeredMBeans.add(key);
385        } catch (Throwable e) {
386            LOG.warn("Failed to register MBean {}", key);
387            LOG.debug("Failure reason: ", e);
388        }
389    }
390
391    protected void unregisterProducer(ObjectName key) throws Exception {
392        queueProducers.remove(key);
393        topicProducers.remove(key);
394        temporaryQueueProducers.remove(key);
395        temporaryTopicProducers.remove(key);
396        dynamicDestinationProducers.remove(key);
397        if (registeredMBeans.remove(key)) {
398            try {
399                managementContext.unregisterMBean(key);
400            } catch (Throwable e) {
401                LOG.warn("Failed to unregister MBean {}", key);
402                LOG.debug("Failure reason: ", e);
403            }
404        }
405    }
406
407    private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
408        DestinationView candidate = map.remove(key);
409        if (candidate != null && view == null) {
410            view = candidate;
411        }
412        return candidate != null ? candidate : view;
413    }
414
415    protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
416        ActiveMQDestination dest = info.getDestination();
417        if (dest.isQueue()) {
418            if (dest.isTemporary()) {
419                temporaryQueueSubscribers.put(key, view);
420            } else {
421                queueSubscribers.put(key, view);
422            }
423        } else {
424            if (dest.isTemporary()) {
425                temporaryTopicSubscribers.put(key, view);
426            } else {
427                if (info.isDurable()) {
428                    durableTopicSubscribers.put(key, view);
429                    // unregister any inactive durable subs
430                    try {
431                        ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
432                        if (inactiveName != null) {
433                            inactiveDurableTopicSubscribers.remove(inactiveName);
434                            registeredMBeans.remove(inactiveName);
435                            managementContext.unregisterMBean(inactiveName);
436                        }
437                    } catch (Throwable e) {
438                        LOG.error("Unable to unregister inactive durable subscriber {}", subscriptionKey, e);
439                    }
440                } else {
441                    topicSubscribers.put(key, view);
442                }
443            }
444        }
445
446        try {
447            AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
448            registeredMBeans.add(key);
449        } catch (Throwable e) {
450            LOG.warn("Failed to register MBean {}", key);
451            LOG.debug("Failure reason: ", e);
452        }
453    }
454
455    protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
456        queueSubscribers.remove(key);
457        topicSubscribers.remove(key);
458        temporaryQueueSubscribers.remove(key);
459        temporaryTopicSubscribers.remove(key);
460        if (registeredMBeans.remove(key)) {
461            try {
462                managementContext.unregisterMBean(key);
463            } catch (Throwable e) {
464                LOG.warn("Failed to unregister MBean {}", key);
465                LOG.debug("Failure reason: ", e);
466            }
467        }
468        DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
469        if (view != null) {
470            // need to put this back in the inactive list
471            SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
472            if (addToInactive) {
473                SubscriptionInfo info = new SubscriptionInfo();
474                info.setClientId(subscriptionKey.getClientId());
475                info.setSubscriptionName(subscriptionKey.getSubscriptionName());
476                info.setDestination(new ActiveMQTopic(view.getDestinationName()));
477                info.setSelector(view.getSelector());
478                addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
479            }
480        }
481    }
482
483    protected void buildExistingSubscriptions() throws Exception {
484        Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
485        Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
486        if (destinations != null) {
487            for (ActiveMQDestination dest : destinations) {
488                if (dest.isTopic()) {
489                    SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
490                    if (infos != null) {
491                        for (int i = 0; i < infos.length; i++) {
492                            SubscriptionInfo info = infos[i];
493                            SubscriptionKey key = new SubscriptionKey(info);
494                            if (!alreadyKnown(key)) {
495                                LOG.debug("Restoring durable subscription MBean {}", info);
496                                subscriptions.put(key, info);
497                            }
498                        }
499                    }
500                }
501            }
502        }
503
504        for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) {
505            addInactiveSubscription(entry.getKey(), entry.getValue(), null);
506        }
507    }
508
509    private boolean alreadyKnown(SubscriptionKey key) {
510        boolean known = false;
511        known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
512        LOG.trace("Sub with key: {}, {} already registered", key, (known ? "": "not"));
513        return known;
514    }
515
516    protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
517        try {
518            ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
519            ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo);
520            SubscriptionView view = new InactiveDurableSubscriptionView(this, brokerService, key.getClientId(), info, subscription);
521
522            try {
523                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
524                registeredMBeans.add(objectName);
525            } catch (Throwable e) {
526                LOG.warn("Failed to register MBean {}", key);
527                LOG.debug("Failure reason: ", e);
528            }
529
530            inactiveDurableTopicSubscribers.put(objectName, view);
531            subscriptionKeys.put(key, objectName);
532        } catch (Exception e) {
533            LOG.error("Failed to register subscription {}", info, e);
534        }
535    }
536
537    public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
538        Message[] messages = getSubscriberMessages(view);
539        CompositeData c[] = new CompositeData[messages.length];
540        for (int i = 0; i < c.length; i++) {
541            try {
542                c[i] = OpenTypeSupport.convert(messages[i]);
543            } catch (Throwable e) {
544                LOG.error("Failed to browse: {}", view, e);
545            }
546        }
547        return c;
548    }
549
550    public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
551        OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
552        Message[] messages = getSubscriberMessages(view);
553        CompositeType ct = factory.getCompositeType();
554        TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
555        TabularDataSupport rc = new TabularDataSupport(tt);
556        for (int i = 0; i < messages.length; i++) {
557            rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
558        }
559        return rc;
560    }
561
562    public void remove(SubscriptionView view, String messageId)  throws Exception {
563        ActiveMQDestination destination = getTopicDestination(view);
564        if (destination != null) {
565            final Destination topic = getTopicRegion().getDestinationMap().get(destination);
566            final MessageAck messageAck = new MessageAck();
567            messageAck.setMessageID(new MessageId(messageId));
568            messageAck.setDestination(destination);
569
570            topic.getMessageStore().removeMessage(brokerService.getAdminConnectionContext(), messageAck);
571
572            // if sub is active, remove from cursor
573            if (view.subscription instanceof DurableTopicSubscription) {
574                final DurableTopicSubscription durableTopicSubscription = (DurableTopicSubscription) view.subscription;
575                final MessageReference messageReference = new NullMessageReference();
576                messageReference.getMessage().setMessageId(messageAck.getFirstMessageId());
577                durableTopicSubscription.getPending().remove(messageReference);
578            }
579
580        } else {
581            throw new IllegalStateException("can't determine topic for sub:" + view);
582        }
583    }
584
585    protected Message[] getSubscriberMessages(SubscriptionView view) {
586        ActiveMQDestination destination = getTopicDestination(view);
587        if (destination != null) {
588            Destination topic = getTopicRegion().getDestinationMap().get(destination);
589            return topic.browse();
590
591        } else {
592            LOG.warn("can't determine topic to browse for sub:" + view);
593            return new Message[]{};
594        }
595    }
596
597    private ActiveMQDestination getTopicDestination(SubscriptionView view) {
598        ActiveMQDestination destination = null;
599        if (view.subscription instanceof DurableTopicSubscription) {
600            destination = new ActiveMQTopic(view.getDestinationName());
601        } else if (view instanceof InactiveDurableSubscriptionView) {
602            destination = ((InactiveDurableSubscriptionView)view).subscriptionInfo.getDestination();
603        }
604        return destination;
605    }
606
607    private ObjectName[] onlyNonSuppressed (Set<ObjectName> set){
608        List<ObjectName> nonSuppressed = new ArrayList<ObjectName>();
609        for(ObjectName key : set){
610            if (managementContext.isAllowedToRegister(key)){
611                nonSuppressed.add(key);
612            }
613        }
614        return nonSuppressed.toArray(new ObjectName[nonSuppressed.size()]);
615    }
616
617    protected ObjectName[] getTopics() {
618        Set<ObjectName> set = topics.keySet();
619        return set.toArray(new ObjectName[set.size()]);
620    }
621
622    protected ObjectName[] getTopicsNonSuppressed() {
623        return onlyNonSuppressed(topics.keySet());
624    }
625
626    protected ObjectName[] getQueues() {
627        Set<ObjectName> set = queues.keySet();
628        return set.toArray(new ObjectName[set.size()]);
629    }
630
631    protected ObjectName[] getQueuesNonSuppressed() {
632        return onlyNonSuppressed(queues.keySet());
633    }
634
635    protected ObjectName[] getTemporaryTopics() {
636        Set<ObjectName> set = temporaryTopics.keySet();
637        return set.toArray(new ObjectName[set.size()]);
638    }
639
640    protected ObjectName[] getTemporaryTopicsNonSuppressed() {
641        return onlyNonSuppressed(temporaryTopics.keySet());
642    }
643
644    protected ObjectName[] getTemporaryQueues() {
645        Set<ObjectName> set = temporaryQueues.keySet();
646        return set.toArray(new ObjectName[set.size()]);
647    }
648
649    protected ObjectName[] getTemporaryQueuesNonSuppressed() {
650        return onlyNonSuppressed(temporaryQueues.keySet());
651    }
652
653    protected ObjectName[] getTopicSubscribers() {
654        Set<ObjectName> set = topicSubscribers.keySet();
655        return set.toArray(new ObjectName[set.size()]);
656    }
657
658    protected ObjectName[] getTopicSubscribersNonSuppressed() {
659        return onlyNonSuppressed(topicSubscribers.keySet());
660    }
661
662    protected ObjectName[] getDurableTopicSubscribers() {
663        Set<ObjectName> set = durableTopicSubscribers.keySet();
664        return set.toArray(new ObjectName[set.size()]);
665    }
666
667    protected ObjectName[] getDurableTopicSubscribersNonSuppressed() {
668        return onlyNonSuppressed(durableTopicSubscribers.keySet());
669    }
670
671    protected ObjectName[] getQueueSubscribers() {
672        Set<ObjectName> set = queueSubscribers.keySet();
673        return set.toArray(new ObjectName[set.size()]);
674    }
675
676    protected ObjectName[] getQueueSubscribersNonSuppressed() {
677        return onlyNonSuppressed(queueSubscribers.keySet());
678    }
679
680    protected ObjectName[] getTemporaryTopicSubscribers() {
681        Set<ObjectName> set = temporaryTopicSubscribers.keySet();
682        return set.toArray(new ObjectName[set.size()]);
683    }
684
685    protected ObjectName[] getTemporaryTopicSubscribersNonSuppressed() {
686        return onlyNonSuppressed(temporaryTopicSubscribers.keySet());
687    }
688
689    protected ObjectName[] getTemporaryQueueSubscribers() {
690        Set<ObjectName> set = temporaryQueueSubscribers.keySet();
691        return set.toArray(new ObjectName[set.size()]);
692    }
693
694    protected ObjectName[] getTemporaryQueueSubscribersNonSuppressed() {
695        return onlyNonSuppressed(temporaryQueueSubscribers.keySet());
696    }
697
698    protected ObjectName[] getInactiveDurableTopicSubscribers() {
699        Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
700        return set.toArray(new ObjectName[set.size()]);
701    }
702
703    protected ObjectName[] getInactiveDurableTopicSubscribersNonSuppressed() {
704        return onlyNonSuppressed(inactiveDurableTopicSubscribers.keySet());
705    }
706
707    protected ObjectName[] getTopicProducers() {
708        Set<ObjectName> set = topicProducers.keySet();
709        return set.toArray(new ObjectName[set.size()]);
710    }
711
712    protected ObjectName[] getTopicProducersNonSuppressed() {
713        return onlyNonSuppressed(topicProducers.keySet());
714    }
715
716    protected ObjectName[] getQueueProducers() {
717        Set<ObjectName> set = queueProducers.keySet();
718        return set.toArray(new ObjectName[set.size()]);
719    }
720
721    protected ObjectName[] getQueueProducersNonSuppressed() {
722        return onlyNonSuppressed(queueProducers.keySet());
723    }
724
725    protected ObjectName[] getTemporaryTopicProducers() {
726        Set<ObjectName> set = temporaryTopicProducers.keySet();
727        return set.toArray(new ObjectName[set.size()]);
728    }
729
730    protected ObjectName[] getTemporaryTopicProducersNonSuppressed() {
731        return onlyNonSuppressed(temporaryTopicProducers.keySet());
732    }
733
734    protected ObjectName[] getTemporaryQueueProducers() {
735        Set<ObjectName> set = temporaryQueueProducers.keySet();
736        return set.toArray(new ObjectName[set.size()]);
737    }
738
739    protected ObjectName[] getTemporaryQueueProducersNonSuppressed() {
740        return onlyNonSuppressed(temporaryQueueProducers.keySet());
741    }
742
743    protected ObjectName[] getDynamicDestinationProducers() {
744        Set<ObjectName> set = dynamicDestinationProducers.keySet();
745        return set.toArray(new ObjectName[set.size()]);
746    }
747
748    protected ObjectName[] getDynamicDestinationProducersNonSuppressed() {
749        return onlyNonSuppressed(dynamicDestinationProducers.keySet());
750    }
751
752    public Broker getContextBroker() {
753        return contextBroker;
754    }
755
756    public void setContextBroker(Broker contextBroker) {
757        this.contextBroker = contextBroker;
758    }
759
760    public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
761        ObjectName objectName = null;
762        try {
763            objectName = BrokerMBeanSupport.createAbortSlowConsumerStrategyName(brokerObjectName, strategy);
764            if (!registeredMBeans.contains(objectName))  {
765
766                AbortSlowConsumerStrategyView view = null;
767                if (strategy instanceof AbortSlowAckConsumerStrategy) {
768                    view = new AbortSlowAckConsumerStrategyView(this, (AbortSlowAckConsumerStrategy) strategy);
769                } else {
770                    view = new AbortSlowConsumerStrategyView(this, strategy);
771                }
772
773                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
774                registeredMBeans.add(objectName);
775            }
776        } catch (Exception e) {
777            LOG.warn("Failed to register MBean {}", strategy);
778            LOG.debug("Failure reason: ", e);
779        }
780        return objectName;
781    }
782
783    public void registerRecoveredTransactionMBean(XATransaction transaction) {
784        try {
785            ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
786            if (!registeredMBeans.contains(objectName))  {
787                RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
788                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
789                registeredMBeans.add(objectName);
790            }
791        } catch (Exception e) {
792            LOG.warn("Failed to register prepared transaction MBean {}", transaction);
793            LOG.debug("Failure reason: ", e);
794        }
795    }
796
797    public void unregister(XATransaction transaction) {
798        try {
799            ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
800            if (registeredMBeans.remove(objectName)) {
801                try {
802                    managementContext.unregisterMBean(objectName);
803                } catch (Throwable e) {
804                    LOG.warn("Failed to unregister MBean {}", objectName);
805                    LOG.debug("Failure reason: ", e);
806                }
807            }
808        } catch (Exception e) {
809            LOG.warn("Failed to create object name to unregister {}", transaction, e);
810        }
811    }
812
813    public ObjectName getSubscriberObjectName(Subscription key) {
814        return subscriptionMap.get(key);
815    }
816
817    public Subscription getSubscriber(ObjectName key) {
818        Subscription sub = null;
819        for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
820            if (entry.getValue().equals(key)) {
821                sub = entry.getKey();
822                break;
823            }
824        }
825        return sub;
826    }
827
828    public Map<ObjectName, DestinationView> getQueueViews() {
829        return queues;
830    }
831}