001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.Map.Entry;
026    import java.util.Set;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.CopyOnWriteArraySet;
029    import java.util.concurrent.ExecutorService;
030    import java.util.concurrent.ThreadPoolExecutor;
031    
032    import javax.management.InstanceNotFoundException;
033    import javax.management.MalformedObjectNameException;
034    import javax.management.ObjectName;
035    import javax.management.openmbean.CompositeData;
036    import javax.management.openmbean.CompositeDataSupport;
037    import javax.management.openmbean.CompositeType;
038    import javax.management.openmbean.OpenDataException;
039    import javax.management.openmbean.TabularData;
040    import javax.management.openmbean.TabularDataSupport;
041    import javax.management.openmbean.TabularType;
042    
043    import org.apache.activemq.broker.Broker;
044    import org.apache.activemq.broker.BrokerService;
045    import org.apache.activemq.broker.ConnectionContext;
046    import org.apache.activemq.broker.ProducerBrokerExchange;
047    import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
048    import org.apache.activemq.broker.region.Destination;
049    import org.apache.activemq.broker.region.DestinationFactory;
050    import org.apache.activemq.broker.region.DestinationFactoryImpl;
051    import org.apache.activemq.broker.region.DestinationInterceptor;
052    import org.apache.activemq.broker.region.Queue;
053    import org.apache.activemq.broker.region.Region;
054    import org.apache.activemq.broker.region.RegionBroker;
055    import org.apache.activemq.broker.region.Subscription;
056    import org.apache.activemq.broker.region.Topic;
057    import org.apache.activemq.broker.region.TopicRegion;
058    import org.apache.activemq.broker.region.TopicSubscription;
059    import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
060    import org.apache.activemq.command.ActiveMQDestination;
061    import org.apache.activemq.command.ActiveMQMessage;
062    import org.apache.activemq.command.ActiveMQTopic;
063    import org.apache.activemq.command.ConsumerInfo;
064    import org.apache.activemq.command.Message;
065    import org.apache.activemq.command.MessageId;
066    import org.apache.activemq.command.ProducerInfo;
067    import org.apache.activemq.command.SubscriptionInfo;
068    import org.apache.activemq.store.MessageRecoveryListener;
069    import org.apache.activemq.store.PersistenceAdapter;
070    import org.apache.activemq.store.TopicMessageStore;
071    import org.apache.activemq.thread.Scheduler;
072    import org.apache.activemq.thread.TaskRunnerFactory;
073    import org.apache.activemq.transaction.XATransaction;
074    import org.apache.activemq.usage.SystemUsage;
075    import org.apache.activemq.util.ServiceStopper;
076    import org.apache.activemq.util.SubscriptionKey;
077    import org.slf4j.Logger;
078    import org.slf4j.LoggerFactory;
079    
080    public class ManagedRegionBroker extends RegionBroker {
081        private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
082        private final ManagementContext managementContext;
083        private final ObjectName brokerObjectName;
084        private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
085        private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
086        private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
087        private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
088        private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
089        private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
090        private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091        private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
092        private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
093        private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
094        private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
095        private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
096        private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
097        private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
098        private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
099        private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
100        private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
101        private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
102        /* This is the first broker in the broker interceptor chain. */
103        private Broker contextBroker;
104    
105        private final ExecutorService asyncInvokeService;
106        private final long mbeanTimeout;
107    
108        public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
109                                   DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
110            super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
111            this.managementContext = context;
112            this.brokerObjectName = brokerObjectName;
113            this.mbeanTimeout = brokerService.getMbeanInvocationTimeout();
114            this.asyncInvokeService = mbeanTimeout > 0 ? executor : null;;
115        }
116    
117        @Override
118        public void start() throws Exception {
119            super.start();
120            // build all existing durable subscriptions
121            buildExistingSubscriptions();
122        }
123    
124        @Override
125        protected void doStop(ServiceStopper stopper) {
126            super.doStop(stopper);
127            // lets remove any mbeans not yet removed
128            for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
129                ObjectName name = iter.next();
130                try {
131                    managementContext.unregisterMBean(name);
132                } catch (InstanceNotFoundException e) {
133                    LOG.warn("The MBean: " + name + " is no longer registered with JMX");
134                } catch (Exception e) {
135                    stopper.onException(this, e);
136                }
137            }
138            registeredMBeans.clear();
139        }
140    
141        @Override
142        protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
143            return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
144        }
145    
146        @Override
147        protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
148            return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
149        }
150    
151        @Override
152        protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
153            return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
154        }
155    
156        @Override
157        protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
158            return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
159        }
160    
161        public void register(ActiveMQDestination destName, Destination destination) {
162            // TODO refactor to allow views for custom destinations
163            try {
164                ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName);
165                DestinationView view;
166                if (destination instanceof Queue) {
167                    view = new QueueView(this, (Queue)destination);
168                } else if (destination instanceof Topic) {
169                    view = new TopicView(this, (Topic)destination);
170                } else {
171                    view = null;
172                    LOG.warn("JMX View is not supported for custom destination: " + destination);
173                }
174                if (view != null) {
175                    registerDestination(objectName, destName, view);
176                }
177            } catch (Exception e) {
178                LOG.error("Failed to register destination " + destName, e);
179            }
180        }
181    
182        public void unregister(ActiveMQDestination destName) {
183            try {
184                ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName);
185                unregisterDestination(objectName);
186            } catch (Exception e) {
187                LOG.error("Failed to unregister " + destName, e);
188            }
189        }
190    
191        public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
192            String connectionClientId = context.getClientId();
193    
194            SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
195            try {
196                ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, sub.getConsumerInfo());
197                SubscriptionView view;
198                if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
199                    // add offline subscribers to inactive list
200                    SubscriptionInfo info = new SubscriptionInfo();
201                    info.setClientId(context.getClientId());
202                    info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
203                    info.setDestination(sub.getConsumerInfo().getDestination());
204                    info.setSelector(sub.getSelector());
205                    addInactiveSubscription(key, info, sub);
206                } else {
207                    String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
208                    if (sub.getConsumerInfo().isDurable()) {
209                        view = new DurableSubscriptionView(this, context.getClientId(), userName, sub);
210                    } else {
211                        if (sub instanceof TopicSubscription) {
212                            view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
213                        } else {
214                            view = new SubscriptionView(context.getClientId(), userName, sub);
215                        }
216                    }
217                    registerSubscription(objectName, sub.getConsumerInfo(), key, view);
218                }
219                subscriptionMap.put(sub, objectName);
220                return objectName;
221            } catch (Exception e) {
222                LOG.error("Failed to register subscription " + sub, e);
223                return null;
224            }
225        }
226    
227        @Override
228        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
229            Subscription sub = super.addConsumer(context, info);
230            SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
231            ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
232            if (inactiveName != null) {
233                // if it was inactive, register it
234                registerSubscription(context, sub);
235            }
236            return sub;
237        }
238    
239        @Override
240        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
241            for (Subscription sub : subscriptionMap.keySet()) {
242                if (sub.getConsumerInfo().equals(info)) {
243                   // unregister all consumer subs
244                   unregisterSubscription(subscriptionMap.get(sub), true);
245                }
246            }
247            super.removeConsumer(context, info);
248        }
249    
250        @Override
251        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
252            super.addProducer(context, info);
253            String connectionClientId = context.getClientId();
254            ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info);
255            String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
256            ProducerView view = new ProducerView(info, connectionClientId, userName, this);
257            registerProducer(objectName, info.getDestination(), view);
258        }
259    
260        @Override
261        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
262            ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info);
263            unregisterProducer(objectName);
264            super.removeProducer(context, info);
265        }
266    
267        @Override
268        public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
269            if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
270                ProducerInfo info = exchange.getProducerState().getInfo();
271                if (info.getDestination() == null && info.getProducerId() != null) {
272                    ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, exchange.getConnectionContext().getClientId(), info);
273                    ProducerView view = this.dynamicDestinationProducers.get(objectName);
274                    if (view != null) {
275                        ActiveMQDestination dest = message.getDestination();
276                        if (dest != null) {
277                            view.setLastUsedDestinationName(dest);
278                        }
279                    }
280                }
281             }
282            super.send(exchange, message);
283        }
284    
285        public void unregisterSubscription(Subscription sub) {
286            ObjectName name = subscriptionMap.remove(sub);
287            if (name != null) {
288                try {
289                    SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
290                    ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
291                    if (inactiveName != null) {
292                        inactiveDurableTopicSubscribers.remove(inactiveName);
293                        managementContext.unregisterMBean(inactiveName);
294                    }
295                } catch (Exception e) {
296                    LOG.error("Failed to unregister subscription " + sub, e);
297                }
298            }
299        }
300    
301        protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
302            if (dest.isQueue()) {
303                if (dest.isTemporary()) {
304                    temporaryQueues.put(key, view);
305                } else {
306                    queues.put(key, view);
307                }
308            } else {
309                if (dest.isTemporary()) {
310                    temporaryTopics.put(key, view);
311                } else {
312                    topics.put(key, view);
313                }
314            }
315            try {
316                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
317                registeredMBeans.add(key);
318            } catch (Throwable e) {
319                LOG.warn("Failed to register MBean: " + key);
320                LOG.debug("Failure reason: " + e, e);
321            }
322        }
323    
324        protected void unregisterDestination(ObjectName key) throws Exception {
325    
326            DestinationView view = removeAndRemember(topics, key, null);
327            view = removeAndRemember(queues, key, view);
328            view = removeAndRemember(temporaryQueues, key, view);
329            view = removeAndRemember(temporaryTopics, key, view);
330            if (registeredMBeans.remove(key)) {
331                try {
332                    managementContext.unregisterMBean(key);
333                } catch (Throwable e) {
334                    LOG.warn("Failed to unregister MBean: " + key);
335                    LOG.debug("Failure reason: " + e, e);
336                }
337            }
338            if (view != null) {
339                key = view.getSlowConsumerStrategy();
340                if (key!= null && registeredMBeans.remove(key)) {
341                    try {
342                        managementContext.unregisterMBean(key);
343                    } catch (Throwable e) {
344                        LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
345                        LOG.debug("Failure reason: " + e, e);
346                    }
347                }
348            }
349        }
350    
351        protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception {
352    
353            if (dest != null) {
354                if (dest.isQueue()) {
355                    if (dest.isTemporary()) {
356                        temporaryQueueProducers.put(key, view);
357                    } else {
358                        queueProducers.put(key, view);
359                    }
360                } else {
361                    if (dest.isTemporary()) {
362                        temporaryTopicProducers.put(key, view);
363                    } else {
364                        topicProducers.put(key, view);
365                    }
366                }
367            } else {
368                dynamicDestinationProducers.put(key, view);
369            }
370    
371            try {
372                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
373                registeredMBeans.add(key);
374            } catch (Throwable e) {
375                LOG.warn("Failed to register MBean: " + key);
376                LOG.debug("Failure reason: " + e, e);
377            }
378        }
379    
380        protected void unregisterProducer(ObjectName key) throws Exception {
381            queueProducers.remove(key);
382            topicProducers.remove(key);
383            temporaryQueueProducers.remove(key);
384            temporaryTopicProducers.remove(key);
385            dynamicDestinationProducers.remove(key);
386            if (registeredMBeans.remove(key)) {
387                try {
388                    managementContext.unregisterMBean(key);
389                } catch (Throwable e) {
390                    LOG.warn("Failed to unregister MBean: " + key);
391                    LOG.debug("Failure reason: " + e, e);
392                }
393            }
394        }
395    
396        private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
397            DestinationView candidate = map.remove(key);
398            if (candidate != null && view == null) {
399                view = candidate;
400            }
401            return candidate != null ? candidate : view;
402        }
403    
404        protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
405            ActiveMQDestination dest = info.getDestination();
406            if (dest.isQueue()) {
407                if (dest.isTemporary()) {
408                    temporaryQueueSubscribers.put(key, view);
409                } else {
410                    queueSubscribers.put(key, view);
411                }
412            } else {
413                if (dest.isTemporary()) {
414                    temporaryTopicSubscribers.put(key, view);
415                } else {
416                    if (info.isDurable()) {
417                        durableTopicSubscribers.put(key, view);
418                        // unregister any inactive durable subs
419                        try {
420                            ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
421                            if (inactiveName != null) {
422                                inactiveDurableTopicSubscribers.remove(inactiveName);
423                                registeredMBeans.remove(inactiveName);
424                                managementContext.unregisterMBean(inactiveName);
425                            }
426                        } catch (Throwable e) {
427                            LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
428                        }
429                    } else {
430                        topicSubscribers.put(key, view);
431                    }
432                }
433            }
434    
435            try {
436                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
437                registeredMBeans.add(key);
438            } catch (Throwable e) {
439                LOG.warn("Failed to register MBean: " + key);
440                LOG.debug("Failure reason: " + e, e);
441            }
442    
443        }
444    
445        protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
446            queueSubscribers.remove(key);
447            topicSubscribers.remove(key);
448            temporaryQueueSubscribers.remove(key);
449            temporaryTopicSubscribers.remove(key);
450            if (registeredMBeans.remove(key)) {
451                try {
452                    managementContext.unregisterMBean(key);
453                } catch (Throwable e) {
454                    LOG.warn("Failed to unregister MBean: " + key);
455                    LOG.debug("Failure reason: " + e, e);
456                }
457            }
458            DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
459            if (view != null) {
460                // need to put this back in the inactive list
461                SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
462                if (addToInactive) {
463                    SubscriptionInfo info = new SubscriptionInfo();
464                    info.setClientId(subscriptionKey.getClientId());
465                    info.setSubscriptionName(subscriptionKey.getSubscriptionName());
466                    info.setDestination(new ActiveMQTopic(view.getDestinationName()));
467                    info.setSelector(view.getSelector());
468                    addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
469                }
470            }
471        }
472    
473        protected void buildExistingSubscriptions() throws Exception {
474            Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
475            Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
476            if (destinations != null) {
477                for (ActiveMQDestination dest : destinations) {
478                    if (dest.isTopic()) {
479                        SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
480                        if (infos != null) {
481                            for (int i = 0; i < infos.length; i++) {
482                                SubscriptionInfo info = infos[i];
483                                SubscriptionKey key = new SubscriptionKey(info);
484                                if (!alreadyKnown(key)) {
485                                    LOG.debug("Restoring durable subscription mbean: " + info);
486                                    subscriptions.put(key, info);
487                                }
488                            }
489                        }
490                    }
491                }
492            }
493    
494            for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) {
495                addInactiveSubscription(entry.getKey(), entry.getValue(), null);
496            }
497        }
498    
499        private boolean alreadyKnown(SubscriptionKey key) {
500            boolean known = false;
501            known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
502            if (LOG.isTraceEnabled()) {
503                LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") +  " already registered");
504            }
505            return known;
506        }
507    
508        protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
509            try {
510                ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
511                ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo);
512                SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription);
513    
514                try {
515                    AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
516                    registeredMBeans.add(objectName);
517                } catch (Throwable e) {
518                    LOG.warn("Failed to register MBean: " + key);
519                    LOG.debug("Failure reason: " + e, e);
520                }
521    
522                inactiveDurableTopicSubscribers.put(objectName, view);
523                subscriptionKeys.put(key, objectName);
524            } catch (Exception e) {
525                LOG.error("Failed to register subscription " + info, e);
526            }
527        }
528    
529        public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
530            List<Message> messages = getSubscriberMessages(view);
531            CompositeData c[] = new CompositeData[messages.size()];
532            for (int i = 0; i < c.length; i++) {
533                try {
534                    c[i] = OpenTypeSupport.convert(messages.get(i));
535                } catch (Throwable e) {
536                    LOG.error("failed to browse : " + view, e);
537                }
538            }
539            return c;
540        }
541    
542        public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
543            OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
544            List<Message> messages = getSubscriberMessages(view);
545            CompositeType ct = factory.getCompositeType();
546            TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
547            TabularDataSupport rc = new TabularDataSupport(tt);
548            for (int i = 0; i < messages.size(); i++) {
549                rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
550            }
551            return rc;
552        }
553    
554        protected List<Message> getSubscriberMessages(SubscriptionView view) {
555            // TODO It is very dangerous operation for big backlogs
556            if (!(destinationFactory instanceof DestinationFactoryImpl)) {
557                throw new RuntimeException("unsupported by " + destinationFactory);
558            }
559            PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
560            final List<Message> result = new ArrayList<Message>();
561            try {
562                ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
563                TopicMessageStore store = adapter.createTopicMessageStore(topic);
564                store.recover(new MessageRecoveryListener() {
565                    @Override
566                    public boolean recoverMessage(Message message) throws Exception {
567                        result.add(message);
568                        return true;
569                    }
570    
571                    @Override
572                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
573                        throw new RuntimeException("Should not be called.");
574                    }
575    
576                    @Override
577                    public boolean hasSpace() {
578                        return true;
579                    }
580    
581                    @Override
582                    public boolean isDuplicate(MessageId id) {
583                        return false;
584                    }
585                });
586            } catch (Throwable e) {
587                LOG.error("Failed to browse messages for Subscription " + view, e);
588            }
589            return result;
590    
591        }
592    
593        protected ObjectName[] getTopics() {
594            Set<ObjectName> set = topics.keySet();
595            return set.toArray(new ObjectName[set.size()]);
596        }
597    
598        protected ObjectName[] getQueues() {
599            Set<ObjectName> set = queues.keySet();
600            return set.toArray(new ObjectName[set.size()]);
601        }
602    
603        protected ObjectName[] getTemporaryTopics() {
604            Set<ObjectName> set = temporaryTopics.keySet();
605            return set.toArray(new ObjectName[set.size()]);
606        }
607    
608        protected ObjectName[] getTemporaryQueues() {
609            Set<ObjectName> set = temporaryQueues.keySet();
610            return set.toArray(new ObjectName[set.size()]);
611        }
612    
613        protected ObjectName[] getTopicSubscribers() {
614            Set<ObjectName> set = topicSubscribers.keySet();
615            return set.toArray(new ObjectName[set.size()]);
616        }
617    
618        protected ObjectName[] getDurableTopicSubscribers() {
619            Set<ObjectName> set = durableTopicSubscribers.keySet();
620            return set.toArray(new ObjectName[set.size()]);
621        }
622    
623        protected ObjectName[] getQueueSubscribers() {
624            Set<ObjectName> set = queueSubscribers.keySet();
625            return set.toArray(new ObjectName[set.size()]);
626        }
627    
628        protected ObjectName[] getTemporaryTopicSubscribers() {
629            Set<ObjectName> set = temporaryTopicSubscribers.keySet();
630            return set.toArray(new ObjectName[set.size()]);
631        }
632    
633        protected ObjectName[] getTemporaryQueueSubscribers() {
634            Set<ObjectName> set = temporaryQueueSubscribers.keySet();
635            return set.toArray(new ObjectName[set.size()]);
636        }
637    
638        protected ObjectName[] getInactiveDurableTopicSubscribers() {
639            Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
640            return set.toArray(new ObjectName[set.size()]);
641        }
642    
643        protected ObjectName[] getTopicProducers() {
644            Set<ObjectName> set = topicProducers.keySet();
645            return set.toArray(new ObjectName[set.size()]);
646        }
647    
648        protected ObjectName[] getQueueProducers() {
649            Set<ObjectName> set = queueProducers.keySet();
650            return set.toArray(new ObjectName[set.size()]);
651        }
652    
653        protected ObjectName[] getTemporaryTopicProducers() {
654            Set<ObjectName> set = temporaryTopicProducers.keySet();
655            return set.toArray(new ObjectName[set.size()]);
656        }
657    
658        protected ObjectName[] getTemporaryQueueProducers() {
659            Set<ObjectName> set = temporaryQueueProducers.keySet();
660            return set.toArray(new ObjectName[set.size()]);
661        }
662    
663        protected ObjectName[] getDynamicDestinationProducers() {
664            Set<ObjectName> set = dynamicDestinationProducers.keySet();
665            return set.toArray(new ObjectName[set.size()]);
666        }
667    
668        public Broker getContextBroker() {
669            return contextBroker;
670        }
671    
672        public void setContextBroker(Broker contextBroker) {
673            this.contextBroker = contextBroker;
674        }
675    
676        public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
677            ObjectName objectName = null;
678            try {
679                objectName = BrokerMBeanSupport.createAbortSlowConsumerStrategyName(brokerObjectName, strategy);
680                if (!registeredMBeans.contains(objectName))  {
681                    AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
682                    AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
683                    registeredMBeans.add(objectName);
684                }
685            } catch (Exception e) {
686                LOG.warn("Failed to register MBean: " + strategy);
687                LOG.debug("Failure reason: " + e, e);
688            }
689            return objectName;
690        }
691    
692        public void registerRecoveredTransactionMBean(XATransaction transaction) {
693            try {
694                ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
695                if (!registeredMBeans.contains(objectName))  {
696                    RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
697                    AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
698                    registeredMBeans.add(objectName);
699                }
700            } catch (Exception e) {
701                LOG.warn("Failed to register prepared transaction MBean: " + transaction);
702                LOG.debug("Failure reason: " + e, e);
703            }
704        }
705    
706        public void unregister(XATransaction transaction) {
707            try {
708                ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
709                if (registeredMBeans.remove(objectName)) {
710                    try {
711                        managementContext.unregisterMBean(objectName);
712                    } catch (Throwable e) {
713                        LOG.warn("Failed to unregister MBean: " + objectName);
714                        LOG.debug("Failure reason: " + e, e);
715                    }
716                }
717            } catch (Exception e) {
718                LOG.warn("Failed to create object name to unregister " + transaction, e);
719            }
720        }
721    
722        public ObjectName getSubscriberObjectName(Subscription key) {
723            return subscriptionMap.get(key);
724        }
725    
726        public Subscription getSubscriber(ObjectName key) {
727            Subscription sub = null;
728            for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
729                if (entry.getValue().equals(key)) {
730                    sub = entry.getKey();
731                    break;
732                }
733            }
734            return sub;
735        }
736    
737        public Map<ObjectName, DestinationView> getQueueViews() {
738            return queues;
739        }
740    }