001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.Map.Entry;
026    import java.util.Set;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.CopyOnWriteArraySet;
029    import java.util.concurrent.ExecutorService;
030    import java.util.concurrent.ThreadPoolExecutor;
031    
032    import javax.management.InstanceNotFoundException;
033    import javax.management.MalformedObjectNameException;
034    import javax.management.ObjectName;
035    import javax.management.openmbean.CompositeData;
036    import javax.management.openmbean.CompositeDataSupport;
037    import javax.management.openmbean.CompositeType;
038    import javax.management.openmbean.OpenDataException;
039    import javax.management.openmbean.TabularData;
040    import javax.management.openmbean.TabularDataSupport;
041    import javax.management.openmbean.TabularType;
042    
043    import org.apache.activemq.broker.Broker;
044    import org.apache.activemq.broker.BrokerService;
045    import org.apache.activemq.broker.ConnectionContext;
046    import org.apache.activemq.broker.ProducerBrokerExchange;
047    import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
048    import org.apache.activemq.broker.region.Destination;
049    import org.apache.activemq.broker.region.DestinationFactory;
050    import org.apache.activemq.broker.region.DestinationFactoryImpl;
051    import org.apache.activemq.broker.region.DestinationInterceptor;
052    import org.apache.activemq.broker.region.Queue;
053    import org.apache.activemq.broker.region.Region;
054    import org.apache.activemq.broker.region.RegionBroker;
055    import org.apache.activemq.broker.region.Subscription;
056    import org.apache.activemq.broker.region.Topic;
057    import org.apache.activemq.broker.region.TopicRegion;
058    import org.apache.activemq.broker.region.TopicSubscription;
059    import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
060    import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
061    import org.apache.activemq.command.ActiveMQDestination;
062    import org.apache.activemq.command.ActiveMQMessage;
063    import org.apache.activemq.command.ActiveMQTopic;
064    import org.apache.activemq.command.ConsumerInfo;
065    import org.apache.activemq.command.Message;
066    import org.apache.activemq.command.MessageId;
067    import org.apache.activemq.command.ProducerInfo;
068    import org.apache.activemq.command.SubscriptionInfo;
069    import org.apache.activemq.store.MessageRecoveryListener;
070    import org.apache.activemq.store.PersistenceAdapter;
071    import org.apache.activemq.store.TopicMessageStore;
072    import org.apache.activemq.thread.Scheduler;
073    import org.apache.activemq.thread.TaskRunnerFactory;
074    import org.apache.activemq.transaction.XATransaction;
075    import org.apache.activemq.usage.SystemUsage;
076    import org.apache.activemq.util.ServiceStopper;
077    import org.apache.activemq.util.SubscriptionKey;
078    import org.slf4j.Logger;
079    import org.slf4j.LoggerFactory;
080    
081    public class ManagedRegionBroker extends RegionBroker {
082        private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
083        private final ManagementContext managementContext;
084        private final ObjectName brokerObjectName;
085        private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
086        private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
087        private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
088        private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
089        private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
090        private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091        private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
092        private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
093        private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
094        private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
095        private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
096        private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
097        private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
098        private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
099        private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
100        private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
101        private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
102        private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
103        /* This is the first broker in the broker interceptor chain. */
104        private Broker contextBroker;
105    
106        private final ExecutorService asyncInvokeService;
107        private final long mbeanTimeout;
108    
109        public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
110                                   DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
111            super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
112            this.managementContext = context;
113            this.brokerObjectName = brokerObjectName;
114            this.mbeanTimeout = brokerService.getMbeanInvocationTimeout();
115            this.asyncInvokeService = mbeanTimeout > 0 ? executor : null;;
116        }
117    
118        @Override
119        public void start() throws Exception {
120            super.start();
121            // build all existing durable subscriptions
122            buildExistingSubscriptions();
123        }
124    
125        @Override
126        protected void doStop(ServiceStopper stopper) {
127            super.doStop(stopper);
128            // lets remove any mbeans not yet removed
129            for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
130                ObjectName name = iter.next();
131                try {
132                    managementContext.unregisterMBean(name);
133                } catch (InstanceNotFoundException e) {
134                    LOG.warn("The MBean {} is no longer registered with JMX", name);
135                } catch (Exception e) {
136                    stopper.onException(this, e);
137                }
138            }
139            registeredMBeans.clear();
140        }
141    
142        @Override
143        protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
144            return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
145        }
146    
147        @Override
148        protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
149            return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
150        }
151    
152        @Override
153        protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
154            return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
155        }
156    
157        @Override
158        protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
159            return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
160        }
161    
162        public void register(ActiveMQDestination destName, Destination destination) {
163            // TODO refactor to allow views for custom destinations
164            try {
165                ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName);
166                DestinationView view;
167                if (destination instanceof Queue) {
168                    view = new QueueView(this, (Queue)destination);
169                } else if (destination instanceof Topic) {
170                    view = new TopicView(this, (Topic)destination);
171                } else {
172                    view = null;
173                    LOG.warn("JMX View is not supported for custom destination {}", destination);
174                }
175                if (view != null) {
176                    registerDestination(objectName, destName, view);
177                }
178            } catch (Exception e) {
179                LOG.error("Failed to register destination {}", destName, e);
180            }
181        }
182    
183        public void unregister(ActiveMQDestination destName) {
184            try {
185                ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName);
186                unregisterDestination(objectName);
187            } catch (Exception e) {
188                LOG.error("Failed to unregister {}", destName, e);
189            }
190        }
191    
192        public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
193            String connectionClientId = context.getClientId();
194    
195            SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
196            try {
197                ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, sub.getConsumerInfo());
198                SubscriptionView view;
199                if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
200                    // add offline subscribers to inactive list
201                    SubscriptionInfo info = new SubscriptionInfo();
202                    info.setClientId(context.getClientId());
203                    info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
204                    info.setDestination(sub.getConsumerInfo().getDestination());
205                    info.setSelector(sub.getSelector());
206                    addInactiveSubscription(key, info, sub);
207                } else {
208                    String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
209                    if (sub.getConsumerInfo().isDurable()) {
210                        view = new DurableSubscriptionView(this, brokerService, context.getClientId(), userName, sub);
211                    } else {
212                        if (sub instanceof TopicSubscription) {
213                            view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
214                        } else {
215                            view = new SubscriptionView(context.getClientId(), userName, sub);
216                        }
217                    }
218                    registerSubscription(objectName, sub.getConsumerInfo(), key, view);
219                }
220                subscriptionMap.put(sub, objectName);
221                return objectName;
222            } catch (Exception e) {
223                LOG.error("Failed to register subscription {}", sub, e);
224                return null;
225            }
226        }
227    
228        @Override
229        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
230            Subscription sub = super.addConsumer(context, info);
231            SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
232            ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
233            if (inactiveName != null) {
234                // if it was inactive, register it
235                registerSubscription(context, sub);
236            }
237            return sub;
238        }
239    
240        @Override
241        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
242            for (Subscription sub : subscriptionMap.keySet()) {
243                if (sub.getConsumerInfo().equals(info)) {
244                   // unregister all consumer subs
245                   unregisterSubscription(subscriptionMap.get(sub), true);
246                }
247            }
248            super.removeConsumer(context, info);
249        }
250    
251        @Override
252        public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
253            super.addProducer(context, info);
254            String connectionClientId = context.getClientId();
255            ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info);
256            String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
257            ProducerView view = new ProducerView(info, connectionClientId, userName, this);
258            registerProducer(objectName, info.getDestination(), view);
259        }
260    
261        @Override
262        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
263            ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info);
264            unregisterProducer(objectName);
265            super.removeProducer(context, info);
266        }
267    
268        @Override
269        public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
270            if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
271                ProducerInfo info = exchange.getProducerState().getInfo();
272                if (info.getDestination() == null && info.getProducerId() != null) {
273                    ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, exchange.getConnectionContext().getClientId(), info);
274                    ProducerView view = this.dynamicDestinationProducers.get(objectName);
275                    if (view != null) {
276                        ActiveMQDestination dest = message.getDestination();
277                        if (dest != null) {
278                            view.setLastUsedDestinationName(dest);
279                        }
280                    }
281                }
282             }
283            super.send(exchange, message);
284        }
285    
286        public void unregisterSubscription(Subscription sub) {
287            ObjectName name = subscriptionMap.remove(sub);
288            if (name != null) {
289                try {
290                    SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
291                    ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
292                    if (inactiveName != null) {
293                        inactiveDurableTopicSubscribers.remove(inactiveName);
294                        managementContext.unregisterMBean(inactiveName);
295                    }
296                } catch (Exception e) {
297                    LOG.error("Failed to unregister subscription {}", sub, e);
298                }
299            }
300        }
301    
302        protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
303            if (dest.isQueue()) {
304                if (dest.isTemporary()) {
305                    temporaryQueues.put(key, view);
306                } else {
307                    queues.put(key, view);
308                }
309            } else {
310                if (dest.isTemporary()) {
311                    temporaryTopics.put(key, view);
312                } else {
313                    topics.put(key, view);
314                }
315            }
316            try {
317                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
318                registeredMBeans.add(key);
319            } catch (Throwable e) {
320                LOG.warn("Failed to register MBean {}", key);
321                LOG.debug("Failure reason: ", e);
322            }
323        }
324    
325        protected void unregisterDestination(ObjectName key) throws Exception {
326    
327            DestinationView view = removeAndRemember(topics, key, null);
328            view = removeAndRemember(queues, key, view);
329            view = removeAndRemember(temporaryQueues, key, view);
330            view = removeAndRemember(temporaryTopics, key, view);
331            if (registeredMBeans.remove(key)) {
332                try {
333                    managementContext.unregisterMBean(key);
334                } catch (Throwable e) {
335                    LOG.warn("Failed to unregister MBean {}", key);
336                    LOG.debug("Failure reason: ", e);
337                }
338            }
339            if (view != null) {
340                key = view.getSlowConsumerStrategy();
341                if (key!= null && registeredMBeans.remove(key)) {
342                    try {
343                        managementContext.unregisterMBean(key);
344                    } catch (Throwable e) {
345                        LOG.warn("Failed to unregister slow consumer strategy MBean {}", key);
346                        LOG.debug("Failure reason: ", e);
347                    }
348                }
349            }
350        }
351    
352        protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception {
353    
354            if (dest != null) {
355                if (dest.isQueue()) {
356                    if (dest.isTemporary()) {
357                        temporaryQueueProducers.put(key, view);
358                    } else {
359                        queueProducers.put(key, view);
360                    }
361                } else {
362                    if (dest.isTemporary()) {
363                        temporaryTopicProducers.put(key, view);
364                    } else {
365                        topicProducers.put(key, view);
366                    }
367                }
368            } else {
369                dynamicDestinationProducers.put(key, view);
370            }
371    
372            try {
373                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
374                registeredMBeans.add(key);
375            } catch (Throwable e) {
376                LOG.warn("Failed to register MBean {}", key);
377                LOG.debug("Failure reason: ", e);
378            }
379        }
380    
381        protected void unregisterProducer(ObjectName key) throws Exception {
382            queueProducers.remove(key);
383            topicProducers.remove(key);
384            temporaryQueueProducers.remove(key);
385            temporaryTopicProducers.remove(key);
386            dynamicDestinationProducers.remove(key);
387            if (registeredMBeans.remove(key)) {
388                try {
389                    managementContext.unregisterMBean(key);
390                } catch (Throwable e) {
391                    LOG.warn("Failed to unregister MBean {}", key);
392                    LOG.debug("Failure reason: ", e);
393                }
394            }
395        }
396    
397        private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
398            DestinationView candidate = map.remove(key);
399            if (candidate != null && view == null) {
400                view = candidate;
401            }
402            return candidate != null ? candidate : view;
403        }
404    
405        protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
406            ActiveMQDestination dest = info.getDestination();
407            if (dest.isQueue()) {
408                if (dest.isTemporary()) {
409                    temporaryQueueSubscribers.put(key, view);
410                } else {
411                    queueSubscribers.put(key, view);
412                }
413            } else {
414                if (dest.isTemporary()) {
415                    temporaryTopicSubscribers.put(key, view);
416                } else {
417                    if (info.isDurable()) {
418                        durableTopicSubscribers.put(key, view);
419                        // unregister any inactive durable subs
420                        try {
421                            ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
422                            if (inactiveName != null) {
423                                inactiveDurableTopicSubscribers.remove(inactiveName);
424                                registeredMBeans.remove(inactiveName);
425                                managementContext.unregisterMBean(inactiveName);
426                            }
427                        } catch (Throwable e) {
428                            LOG.error("Unable to unregister inactive durable subscriber {}", subscriptionKey, e);
429                        }
430                    } else {
431                        topicSubscribers.put(key, view);
432                    }
433                }
434            }
435    
436            try {
437                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
438                registeredMBeans.add(key);
439            } catch (Throwable e) {
440                LOG.warn("Failed to register MBean {}", key);
441                LOG.debug("Failure reason: ", e);
442            }
443    
444        }
445    
446        protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
447            queueSubscribers.remove(key);
448            topicSubscribers.remove(key);
449            temporaryQueueSubscribers.remove(key);
450            temporaryTopicSubscribers.remove(key);
451            if (registeredMBeans.remove(key)) {
452                try {
453                    managementContext.unregisterMBean(key);
454                } catch (Throwable e) {
455                    LOG.warn("Failed to unregister MBean {}", key);
456                    LOG.debug("Failure reason: ", e);
457                }
458            }
459            DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
460            if (view != null) {
461                // need to put this back in the inactive list
462                SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
463                if (addToInactive) {
464                    SubscriptionInfo info = new SubscriptionInfo();
465                    info.setClientId(subscriptionKey.getClientId());
466                    info.setSubscriptionName(subscriptionKey.getSubscriptionName());
467                    info.setDestination(new ActiveMQTopic(view.getDestinationName()));
468                    info.setSelector(view.getSelector());
469                    addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
470                }
471            }
472        }
473    
474        protected void buildExistingSubscriptions() throws Exception {
475            Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
476            Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
477            if (destinations != null) {
478                for (ActiveMQDestination dest : destinations) {
479                    if (dest.isTopic()) {
480                        SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
481                        if (infos != null) {
482                            for (int i = 0; i < infos.length; i++) {
483                                SubscriptionInfo info = infos[i];
484                                SubscriptionKey key = new SubscriptionKey(info);
485                                if (!alreadyKnown(key)) {
486                                    LOG.debug("Restoring durable subscription MBean {}", info);
487                                    subscriptions.put(key, info);
488                                }
489                            }
490                        }
491                    }
492                }
493            }
494    
495            for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) {
496                addInactiveSubscription(entry.getKey(), entry.getValue(), null);
497            }
498        }
499    
500        private boolean alreadyKnown(SubscriptionKey key) {
501            boolean known = false;
502            known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
503            LOG.trace("Sub with key: {}, {} already registered", key, (known ? "": "not"));
504            return known;
505        }
506    
507        protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
508            try {
509                ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
510                ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo);
511                SubscriptionView view = new InactiveDurableSubscriptionView(this, brokerService, key.getClientId(), info, subscription);
512    
513                try {
514                    AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
515                    registeredMBeans.add(objectName);
516                } catch (Throwable e) {
517                    LOG.warn("Failed to register MBean {}", key);
518                    LOG.debug("Failure reason: ", e);
519                }
520    
521                inactiveDurableTopicSubscribers.put(objectName, view);
522                subscriptionKeys.put(key, objectName);
523            } catch (Exception e) {
524                LOG.error("Failed to register subscription {}", info, e);
525            }
526        }
527    
528        public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
529            List<Message> messages = getSubscriberMessages(view);
530            CompositeData c[] = new CompositeData[messages.size()];
531            for (int i = 0; i < c.length; i++) {
532                try {
533                    c[i] = OpenTypeSupport.convert(messages.get(i));
534                } catch (Throwable e) {
535                    LOG.error("Failed to browse: {}", view, e);
536                }
537            }
538            return c;
539        }
540    
541        public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
542            OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
543            List<Message> messages = getSubscriberMessages(view);
544            CompositeType ct = factory.getCompositeType();
545            TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
546            TabularDataSupport rc = new TabularDataSupport(tt);
547            for (int i = 0; i < messages.size(); i++) {
548                rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
549            }
550            return rc;
551        }
552    
553        protected List<Message> getSubscriberMessages(SubscriptionView view) {
554            // TODO It is very dangerous operation for big backlogs
555            if (!(destinationFactory instanceof DestinationFactoryImpl)) {
556                throw new RuntimeException("unsupported by " + destinationFactory);
557            }
558            PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
559            final List<Message> result = new ArrayList<Message>();
560            try {
561                ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
562                TopicMessageStore store = adapter.createTopicMessageStore(topic);
563                store.recover(new MessageRecoveryListener() {
564                    @Override
565                    public boolean recoverMessage(Message message) throws Exception {
566                        result.add(message);
567                        return true;
568                    }
569    
570                    @Override
571                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
572                        throw new RuntimeException("Should not be called.");
573                    }
574    
575                    @Override
576                    public boolean hasSpace() {
577                        return true;
578                    }
579    
580                    @Override
581                    public boolean isDuplicate(MessageId id) {
582                        return false;
583                    }
584                });
585            } catch (Throwable e) {
586                LOG.error("Failed to browse messages for Subscription {}", view, e);
587            }
588            return result;
589    
590        }
591    
592        protected ObjectName[] getTopics() {
593            Set<ObjectName> set = topics.keySet();
594            return set.toArray(new ObjectName[set.size()]);
595        }
596    
597        protected ObjectName[] getQueues() {
598            Set<ObjectName> set = queues.keySet();
599            return set.toArray(new ObjectName[set.size()]);
600        }
601    
602        protected ObjectName[] getTemporaryTopics() {
603            Set<ObjectName> set = temporaryTopics.keySet();
604            return set.toArray(new ObjectName[set.size()]);
605        }
606    
607        protected ObjectName[] getTemporaryQueues() {
608            Set<ObjectName> set = temporaryQueues.keySet();
609            return set.toArray(new ObjectName[set.size()]);
610        }
611    
612        protected ObjectName[] getTopicSubscribers() {
613            Set<ObjectName> set = topicSubscribers.keySet();
614            return set.toArray(new ObjectName[set.size()]);
615        }
616    
617        protected ObjectName[] getDurableTopicSubscribers() {
618            Set<ObjectName> set = durableTopicSubscribers.keySet();
619            return set.toArray(new ObjectName[set.size()]);
620        }
621    
622        protected ObjectName[] getQueueSubscribers() {
623            Set<ObjectName> set = queueSubscribers.keySet();
624            return set.toArray(new ObjectName[set.size()]);
625        }
626    
627        protected ObjectName[] getTemporaryTopicSubscribers() {
628            Set<ObjectName> set = temporaryTopicSubscribers.keySet();
629            return set.toArray(new ObjectName[set.size()]);
630        }
631    
632        protected ObjectName[] getTemporaryQueueSubscribers() {
633            Set<ObjectName> set = temporaryQueueSubscribers.keySet();
634            return set.toArray(new ObjectName[set.size()]);
635        }
636    
637        protected ObjectName[] getInactiveDurableTopicSubscribers() {
638            Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
639            return set.toArray(new ObjectName[set.size()]);
640        }
641    
642        protected ObjectName[] getTopicProducers() {
643            Set<ObjectName> set = topicProducers.keySet();
644            return set.toArray(new ObjectName[set.size()]);
645        }
646    
647        protected ObjectName[] getQueueProducers() {
648            Set<ObjectName> set = queueProducers.keySet();
649            return set.toArray(new ObjectName[set.size()]);
650        }
651    
652        protected ObjectName[] getTemporaryTopicProducers() {
653            Set<ObjectName> set = temporaryTopicProducers.keySet();
654            return set.toArray(new ObjectName[set.size()]);
655        }
656    
657        protected ObjectName[] getTemporaryQueueProducers() {
658            Set<ObjectName> set = temporaryQueueProducers.keySet();
659            return set.toArray(new ObjectName[set.size()]);
660        }
661    
662        protected ObjectName[] getDynamicDestinationProducers() {
663            Set<ObjectName> set = dynamicDestinationProducers.keySet();
664            return set.toArray(new ObjectName[set.size()]);
665        }
666    
667        public Broker getContextBroker() {
668            return contextBroker;
669        }
670    
671        public void setContextBroker(Broker contextBroker) {
672            this.contextBroker = contextBroker;
673        }
674    
675        public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
676            ObjectName objectName = null;
677            try {
678                objectName = BrokerMBeanSupport.createAbortSlowConsumerStrategyName(brokerObjectName, strategy);
679                if (!registeredMBeans.contains(objectName))  {
680    
681                    AbortSlowConsumerStrategyView view = null;
682                    if (strategy instanceof AbortSlowAckConsumerStrategy) {
683                        view = new AbortSlowAckConsumerStrategyView(this, (AbortSlowAckConsumerStrategy) strategy);
684                    } else {
685                        view = new AbortSlowConsumerStrategyView(this, strategy);
686                    }
687    
688                    AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
689                    registeredMBeans.add(objectName);
690                }
691            } catch (Exception e) {
692                LOG.warn("Failed to register MBean {}", strategy);
693                LOG.debug("Failure reason: ", e);
694            }
695            return objectName;
696        }
697    
698        public void registerRecoveredTransactionMBean(XATransaction transaction) {
699            try {
700                ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
701                if (!registeredMBeans.contains(objectName))  {
702                    RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
703                    AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
704                    registeredMBeans.add(objectName);
705                }
706            } catch (Exception e) {
707                LOG.warn("Failed to register prepared transaction MBean {}", transaction);
708                LOG.debug("Failure reason: ", e);
709            }
710        }
711    
712        public void unregister(XATransaction transaction) {
713            try {
714                ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
715                if (registeredMBeans.remove(objectName)) {
716                    try {
717                        managementContext.unregisterMBean(objectName);
718                    } catch (Throwable e) {
719                        LOG.warn("Failed to unregister MBean {}", objectName);
720                        LOG.debug("Failure reason: ", e);
721                    }
722                }
723            } catch (Exception e) {
724                LOG.warn("Failed to create object name to unregister {}", transaction, e);
725            }
726        }
727    
728        public ObjectName getSubscriberObjectName(Subscription key) {
729            return subscriptionMap.get(key);
730        }
731    
732        public Subscription getSubscriber(ObjectName key) {
733            Subscription sub = null;
734            for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
735                if (entry.getValue().equals(key)) {
736                    sub = entry.getKey();
737                    break;
738                }
739            }
740            return sub;
741        }
742    
743        public Map<ObjectName, DestinationView> getQueueViews() {
744            return queues;
745        }
746    }