001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Hashtable;
023    import java.util.Iterator;
024    import java.util.List;
025    import java.util.Map;
026    import java.util.Map.Entry;
027    import java.util.Set;
028    import java.util.concurrent.ConcurrentHashMap;
029    import java.util.concurrent.CopyOnWriteArraySet;
030    import java.util.concurrent.ExecutorService;
031    import java.util.concurrent.ThreadPoolExecutor;
032    
033    import javax.management.InstanceNotFoundException;
034    import javax.management.MalformedObjectNameException;
035    import javax.management.ObjectName;
036    import javax.management.openmbean.CompositeData;
037    import javax.management.openmbean.CompositeDataSupport;
038    import javax.management.openmbean.CompositeType;
039    import javax.management.openmbean.OpenDataException;
040    import javax.management.openmbean.TabularData;
041    import javax.management.openmbean.TabularDataSupport;
042    import javax.management.openmbean.TabularType;
043    
044    import org.apache.activemq.broker.Broker;
045    import org.apache.activemq.broker.BrokerService;
046    import org.apache.activemq.broker.ConnectionContext;
047    import org.apache.activemq.broker.ProducerBrokerExchange;
048    import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
049    import org.apache.activemq.broker.region.Destination;
050    import org.apache.activemq.broker.region.DestinationFactory;
051    import org.apache.activemq.broker.region.DestinationFactoryImpl;
052    import org.apache.activemq.broker.region.DestinationInterceptor;
053    import org.apache.activemq.broker.region.Queue;
054    import org.apache.activemq.broker.region.Region;
055    import org.apache.activemq.broker.region.RegionBroker;
056    import org.apache.activemq.broker.region.Subscription;
057    import org.apache.activemq.broker.region.Topic;
058    import org.apache.activemq.broker.region.TopicRegion;
059    import org.apache.activemq.broker.region.TopicSubscription;
060    import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
061    import org.apache.activemq.command.ActiveMQDestination;
062    import org.apache.activemq.command.ActiveMQMessage;
063    import org.apache.activemq.command.ActiveMQTopic;
064    import org.apache.activemq.command.ConsumerInfo;
065    import org.apache.activemq.command.Message;
066    import org.apache.activemq.command.MessageId;
067    import org.apache.activemq.command.ProducerInfo;
068    import org.apache.activemq.command.SubscriptionInfo;
069    import org.apache.activemq.store.MessageRecoveryListener;
070    import org.apache.activemq.store.PersistenceAdapter;
071    import org.apache.activemq.store.TopicMessageStore;
072    import org.apache.activemq.thread.Scheduler;
073    import org.apache.activemq.thread.TaskRunnerFactory;
074    import org.apache.activemq.transaction.XATransaction;
075    import org.apache.activemq.usage.SystemUsage;
076    import org.apache.activemq.util.JMXSupport;
077    import org.apache.activemq.util.ServiceStopper;
078    import org.apache.activemq.util.SubscriptionKey;
079    import org.slf4j.Logger;
080    import org.slf4j.LoggerFactory;
081    
082    public class ManagedRegionBroker extends RegionBroker {
083        private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
084        private final ManagementContext managementContext;
085        private final ObjectName brokerObjectName;
086        private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
087        private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
088        private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
089        private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
090        private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091        private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
092        private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
093        private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
094        private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
095        private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
096        private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
097        private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
098        private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
099        private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
100        private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
101        private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
102        private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
103        private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
104        /* This is the first broker in the broker interceptor chain. */
105        private Broker contextBroker;
106    
107        private final ExecutorService asyncInvokeService;
108        private final long mbeanTimeout;
109    
110        public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
111                                   DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
112            super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
113            this.managementContext = context;
114            this.brokerObjectName = brokerObjectName;
115            this.mbeanTimeout = brokerService.getMbeanInvocationTimeout();
116            this.asyncInvokeService = mbeanTimeout > 0 ? executor : null;;
117        }
118    
119        @Override
120        public void start() throws Exception {
121            super.start();
122            // build all existing durable subscriptions
123            buildExistingSubscriptions();
124        }
125    
126        @Override
127        protected void doStop(ServiceStopper stopper) {
128            super.doStop(stopper);
129            // lets remove any mbeans not yet removed
130            for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
131                ObjectName name = iter.next();
132                try {
133                    managementContext.unregisterMBean(name);
134                } catch (InstanceNotFoundException e) {
135                    LOG.warn("The MBean: " + name + " is no longer registered with JMX");
136                } catch (Exception e) {
137                    stopper.onException(this, e);
138                }
139            }
140            registeredMBeans.clear();
141        }
142    
143        @Override
144        protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
145            return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
146        }
147    
148        @Override
149        protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
150            return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
151        }
152    
153        @Override
154        protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
155            return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
156        }
157    
158        @Override
159        protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
160            return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
161        }
162    
163        public void register(ActiveMQDestination destName, Destination destination) {
164            // TODO refactor to allow views for custom destinations
165            try {
166                ObjectName objectName = createObjectName(destName);
167                DestinationView view;
168                if (destination instanceof Queue) {
169                    view = new QueueView(this, (Queue)destination);
170                } else if (destination instanceof Topic) {
171                    view = new TopicView(this, (Topic)destination);
172                } else {
173                    view = null;
174                    LOG.warn("JMX View is not supported for custom destination: " + destination);
175                }
176                if (view != null) {
177                    registerDestination(objectName, destName, view);
178                }
179            } catch (Exception e) {
180                LOG.error("Failed to register destination " + destName, e);
181            }
182        }
183    
184        public void unregister(ActiveMQDestination destName) {
185            try {
186                ObjectName objectName = createObjectName(destName);
187                unregisterDestination(objectName);
188            } catch (Exception e) {
189                LOG.error("Failed to unregister " + destName, e);
190            }
191        }
192    
193        public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
194            String connectionClientId = context.getClientId();
195            ObjectName brokerJmxObjectName = brokerObjectName;
196            String objectNameStr = getSubscriptionObjectName(sub.getConsumerInfo(), connectionClientId, brokerJmxObjectName);
197            SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
198            try {
199                ObjectName objectName = new ObjectName(objectNameStr);
200                SubscriptionView view;
201                if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
202                    // add offline subscribers to inactive list
203                    SubscriptionInfo info = new SubscriptionInfo();
204                    info.setClientId(context.getClientId());
205                    info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
206                    info.setDestination(sub.getConsumerInfo().getDestination());
207                    info.setSelector(sub.getSelector());
208                    addInactiveSubscription(key, info, sub);
209                } else {
210                    String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
211                    if (sub.getConsumerInfo().isDurable()) {
212                        view = new DurableSubscriptionView(this, context.getClientId(), userName, sub);
213                    } else {
214                        if (sub instanceof TopicSubscription) {
215                            view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
216                        } else {
217                            view = new SubscriptionView(context.getClientId(), userName, sub);
218                        }
219                    }
220                    registerSubscription(objectName, sub.getConsumerInfo(), key, view);
221                }
222                subscriptionMap.put(sub, objectName);
223                return objectName;
224            } catch (Exception e) {
225                LOG.error("Failed to register subscription " + sub, e);
226                return null;
227            }
228        }
229    
230        public static String getSubscriptionObjectName(ConsumerInfo info, String connectionClientId, ObjectName brokerJmxObjectName) {
231            Hashtable<String, String> map = brokerJmxObjectName.getKeyPropertyList();
232            String brokerDomain = brokerJmxObjectName.getDomain();
233            String objectNameStr = brokerDomain + ":" + "BrokerName=" + map.get("BrokerName") + ",Type=Subscription,";
234            String destinationType = "destinationType=" + info.getDestination().getDestinationTypeAsString();
235            String destinationName = "destinationName=" + JMXSupport.encodeObjectNamePart(info.getDestination().getPhysicalName());
236            String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
237            String persistentMode = "persistentMode=";
238            String consumerId = "";
239            if (info.isDurable()) {
240                persistentMode += "Durable,subscriptionID=" + JMXSupport.encodeObjectNamePart(info.getSubscriptionName());
241            } else {
242                persistentMode += "Non-Durable";
243                if (info.getConsumerId() != null) {
244                    consumerId = ",consumerId=" + JMXSupport.encodeObjectNamePart(info.getConsumerId().toString());
245                }
246            }
247            objectNameStr += persistentMode + ",";
248            objectNameStr += destinationType + ",";
249            objectNameStr += destinationName + ",";
250            objectNameStr += clientId;
251            objectNameStr += consumerId;
252            return objectNameStr;
253        }
254    
255        @Override
256        public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
257            Subscription sub = super.addConsumer(context, info);
258            SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
259            ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
260            if (inactiveName != null) {
261                // if it was inactive, register it
262                registerSubscription(context, sub);
263            }
264            return sub;
265        }
266    
267        @Override
268        public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
269            for (Subscription sub : subscriptionMap.keySet()) {
270                if (sub.getConsumerInfo().equals(info)) {
271                   // unregister all consumer subs
272                   unregisterSubscription(subscriptionMap.get(sub), true);
273                }
274            }
275            super.removeConsumer(context, info);
276        }
277    
278        @Override
279        public void addProducer(ConnectionContext context, ProducerInfo info)
280                throws Exception {
281            super.addProducer(context, info);
282            String connectionClientId = context.getClientId();
283            ObjectName objectName = createObjectName(info, connectionClientId);
284            String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
285            ProducerView view = new ProducerView(info, connectionClientId, userName, this);
286            registerProducer(objectName, info.getDestination(), view);
287        }
288    
289        @Override
290        public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
291            ObjectName objectName = createObjectName(info, context.getClientId());
292            unregisterProducer(objectName);
293            super.removeProducer(context, info);
294        }
295    
296        @Override
297        public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
298            if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
299                ProducerInfo info = exchange.getProducerState().getInfo();
300                if (info.getDestination() == null && info.getProducerId() != null) {
301                    ObjectName objectName = createObjectName(info, exchange.getConnectionContext().getClientId());
302                    ProducerView view = this.dynamicDestinationProducers.get(objectName);
303                    if (view != null) {
304                        ActiveMQDestination dest = message.getDestination();
305                        if (dest != null) {
306                            view.setLastUsedDestinationName(dest);
307                        }
308                    }
309                }
310             }
311            super.send(exchange, message);
312        }
313    
314        public void unregisterSubscription(Subscription sub) {
315            ObjectName name = subscriptionMap.remove(sub);
316            if (name != null) {
317                try {
318                    SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
319                    ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
320                    if (inactiveName != null) {
321                        inactiveDurableTopicSubscribers.remove(inactiveName);
322                        managementContext.unregisterMBean(inactiveName);
323                    }
324                } catch (Exception e) {
325                    LOG.error("Failed to unregister subscription " + sub, e);
326                }
327            }
328        }
329    
330        protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
331            if (dest.isQueue()) {
332                if (dest.isTemporary()) {
333                    temporaryQueues.put(key, view);
334                } else {
335                    queues.put(key, view);
336                }
337            } else {
338                if (dest.isTemporary()) {
339                    temporaryTopics.put(key, view);
340                } else {
341                    topics.put(key, view);
342                }
343            }
344            try {
345                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
346                registeredMBeans.add(key);
347            } catch (Throwable e) {
348                LOG.warn("Failed to register MBean: " + key);
349                LOG.debug("Failure reason: " + e, e);
350            }
351        }
352    
353        protected void unregisterDestination(ObjectName key) throws Exception {
354    
355            DestinationView view = removeAndRemember(topics, key, null);
356            view = removeAndRemember(queues, key, view);
357            view = removeAndRemember(temporaryQueues, key, view);
358            view = removeAndRemember(temporaryTopics, key, view);
359            if (registeredMBeans.remove(key)) {
360                try {
361                    managementContext.unregisterMBean(key);
362                } catch (Throwable e) {
363                    LOG.warn("Failed to unregister MBean: " + key);
364                    LOG.debug("Failure reason: " + e, e);
365                }
366            }
367            if (view != null) {
368                key = view.getSlowConsumerStrategy();
369                if (key!= null && registeredMBeans.remove(key)) {
370                    try {
371                        managementContext.unregisterMBean(key);
372                    } catch (Throwable e) {
373                        LOG.warn("Failed to unregister slow consumer strategy MBean: " + key);
374                        LOG.debug("Failure reason: " + e, e);
375                    }
376                }
377            }
378        }
379    
380        protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception {
381    
382            if (dest != null) {
383                if (dest.isQueue()) {
384                    if (dest.isTemporary()) {
385                        temporaryQueueProducers.put(key, view);
386                    } else {
387                        queueProducers.put(key, view);
388                    }
389                } else {
390                    if (dest.isTemporary()) {
391                        temporaryTopicProducers.put(key, view);
392                    } else {
393                        topicProducers.put(key, view);
394                    }
395                }
396            } else {
397                dynamicDestinationProducers.put(key, view);
398            }
399    
400            try {
401                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
402                registeredMBeans.add(key);
403            } catch (Throwable e) {
404                LOG.warn("Failed to register MBean: " + key);
405                LOG.debug("Failure reason: " + e, e);
406            }
407        }
408    
409        protected void unregisterProducer(ObjectName key) throws Exception {
410            queueProducers.remove(key);
411            topicProducers.remove(key);
412            temporaryQueueProducers.remove(key);
413            temporaryTopicProducers.remove(key);
414            dynamicDestinationProducers.remove(key);
415            if (registeredMBeans.remove(key)) {
416                try {
417                    managementContext.unregisterMBean(key);
418                } catch (Throwable e) {
419                    LOG.warn("Failed to unregister MBean: " + key);
420                    LOG.debug("Failure reason: " + e, e);
421                }
422            }
423        }
424    
425        private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
426            DestinationView candidate = map.remove(key);
427            if (candidate != null && view == null) {
428                view = candidate;
429            }
430            return candidate != null ? candidate : view;
431        }
432    
433        protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
434            ActiveMQDestination dest = info.getDestination();
435            if (dest.isQueue()) {
436                if (dest.isTemporary()) {
437                    temporaryQueueSubscribers.put(key, view);
438                } else {
439                    queueSubscribers.put(key, view);
440                }
441            } else {
442                if (dest.isTemporary()) {
443                    temporaryTopicSubscribers.put(key, view);
444                } else {
445                    if (info.isDurable()) {
446                        durableTopicSubscribers.put(key, view);
447                        // unregister any inactive durable subs
448                        try {
449                            ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
450                            if (inactiveName != null) {
451                                inactiveDurableTopicSubscribers.remove(inactiveName);
452                                registeredMBeans.remove(inactiveName);
453                                managementContext.unregisterMBean(inactiveName);
454                            }
455                        } catch (Throwable e) {
456                            LOG.error("Unable to unregister inactive durable subscriber: " + subscriptionKey, e);
457                        }
458                    } else {
459                        topicSubscribers.put(key, view);
460                    }
461                }
462            }
463    
464            try {
465                AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
466                registeredMBeans.add(key);
467            } catch (Throwable e) {
468                LOG.warn("Failed to register MBean: " + key);
469                LOG.debug("Failure reason: " + e, e);
470            }
471    
472        }
473    
474        protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
475            queueSubscribers.remove(key);
476            topicSubscribers.remove(key);
477            temporaryQueueSubscribers.remove(key);
478            temporaryTopicSubscribers.remove(key);
479            if (registeredMBeans.remove(key)) {
480                try {
481                    managementContext.unregisterMBean(key);
482                } catch (Throwable e) {
483                    LOG.warn("Failed to unregister MBean: " + key);
484                    LOG.debug("Failure reason: " + e, e);
485                }
486            }
487            DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
488            if (view != null) {
489                // need to put this back in the inactive list
490                SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
491                if (addToInactive) {
492                    SubscriptionInfo info = new SubscriptionInfo();
493                    info.setClientId(subscriptionKey.getClientId());
494                    info.setSubscriptionName(subscriptionKey.getSubscriptionName());
495                    info.setDestination(new ActiveMQTopic(view.getDestinationName()));
496                    info.setSelector(view.getSelector());
497                    addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
498                }
499            }
500        }
501    
502        protected void buildExistingSubscriptions() throws Exception {
503            Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
504            Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
505            if (destinations != null) {
506                for (ActiveMQDestination dest : destinations) {
507                    if (dest.isTopic()) {
508                        SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
509                        if (infos != null) {
510                            for (int i = 0; i < infos.length; i++) {
511                                SubscriptionInfo info = infos[i];
512                                SubscriptionKey key = new SubscriptionKey(info);
513                                if (!alreadyKnown(key)) {
514                                    LOG.debug("Restoring durable subscription mbean: " + info);
515                                    subscriptions.put(key, info);
516                                }
517                            }
518                        }
519                    }
520                }
521            }
522    
523            for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) {
524                addInactiveSubscription(entry.getKey(), entry.getValue(), null);
525            }
526        }
527    
528        private boolean alreadyKnown(SubscriptionKey key) {
529            boolean known = false;
530            known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
531            if (LOG.isTraceEnabled()) {
532                LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") +  " already registered");
533            }
534            return known;
535        }
536    
537        protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
538            try {
539                ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
540                ObjectName objectName = new ObjectName(getSubscriptionObjectName(offlineConsumerInfo, info.getClientId(), brokerObjectName));
541                SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription);
542    
543                try {
544                    AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
545                    registeredMBeans.add(objectName);
546                } catch (Throwable e) {
547                    LOG.warn("Failed to register MBean: " + key);
548                    LOG.debug("Failure reason: " + e, e);
549                }
550    
551                inactiveDurableTopicSubscribers.put(objectName, view);
552                subscriptionKeys.put(key, objectName);
553            } catch (Exception e) {
554                LOG.error("Failed to register subscription " + info, e);
555            }
556        }
557    
558        public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
559            List<Message> messages = getSubscriberMessages(view);
560            CompositeData c[] = new CompositeData[messages.size()];
561            for (int i = 0; i < c.length; i++) {
562                try {
563                    c[i] = OpenTypeSupport.convert(messages.get(i));
564                } catch (Throwable e) {
565                    LOG.error("failed to browse : " + view, e);
566                }
567            }
568            return c;
569        }
570    
571        public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
572            OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
573            List<Message> messages = getSubscriberMessages(view);
574            CompositeType ct = factory.getCompositeType();
575            TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
576            TabularDataSupport rc = new TabularDataSupport(tt);
577            for (int i = 0; i < messages.size(); i++) {
578                rc.put(new CompositeDataSupport(ct, factory.getFields(messages.get(i))));
579            }
580            return rc;
581        }
582    
583        protected List<Message> getSubscriberMessages(SubscriptionView view) {
584            // TODO It is very dangerous operation for big backlogs
585            if (!(destinationFactory instanceof DestinationFactoryImpl)) {
586                throw new RuntimeException("unsupported by " + destinationFactory);
587            }
588            PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
589            final List<Message> result = new ArrayList<Message>();
590            try {
591                ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
592                TopicMessageStore store = adapter.createTopicMessageStore(topic);
593                store.recover(new MessageRecoveryListener() {
594                    public boolean recoverMessage(Message message) throws Exception {
595                        result.add(message);
596                        return true;
597                    }
598    
599                    public boolean recoverMessageReference(MessageId messageReference) throws Exception {
600                        throw new RuntimeException("Should not be called.");
601                    }
602    
603                    public boolean hasSpace() {
604                        return true;
605                    }
606    
607                    public boolean isDuplicate(MessageId id) {
608                        return false;
609                    }
610                });
611            } catch (Throwable e) {
612                LOG.error("Failed to browse messages for Subscription " + view, e);
613            }
614            return result;
615    
616        }
617    
618        protected ObjectName[] getTopics() {
619            Set<ObjectName> set = topics.keySet();
620            return set.toArray(new ObjectName[set.size()]);
621        }
622    
623        protected ObjectName[] getQueues() {
624            Set<ObjectName> set = queues.keySet();
625            return set.toArray(new ObjectName[set.size()]);
626        }
627    
628        protected ObjectName[] getTemporaryTopics() {
629            Set<ObjectName> set = temporaryTopics.keySet();
630            return set.toArray(new ObjectName[set.size()]);
631        }
632    
633        protected ObjectName[] getTemporaryQueues() {
634            Set<ObjectName> set = temporaryQueues.keySet();
635            return set.toArray(new ObjectName[set.size()]);
636        }
637    
638        protected ObjectName[] getTopicSubscribers() {
639            Set<ObjectName> set = topicSubscribers.keySet();
640            return set.toArray(new ObjectName[set.size()]);
641        }
642    
643        protected ObjectName[] getDurableTopicSubscribers() {
644            Set<ObjectName> set = durableTopicSubscribers.keySet();
645            return set.toArray(new ObjectName[set.size()]);
646        }
647    
648        protected ObjectName[] getQueueSubscribers() {
649            Set<ObjectName> set = queueSubscribers.keySet();
650            return set.toArray(new ObjectName[set.size()]);
651        }
652    
653        protected ObjectName[] getTemporaryTopicSubscribers() {
654            Set<ObjectName> set = temporaryTopicSubscribers.keySet();
655            return set.toArray(new ObjectName[set.size()]);
656        }
657    
658        protected ObjectName[] getTemporaryQueueSubscribers() {
659            Set<ObjectName> set = temporaryQueueSubscribers.keySet();
660            return set.toArray(new ObjectName[set.size()]);
661        }
662    
663        protected ObjectName[] getInactiveDurableTopicSubscribers() {
664            Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
665            return set.toArray(new ObjectName[set.size()]);
666        }
667    
668        protected ObjectName[] getTopicProducers() {
669            Set<ObjectName> set = topicProducers.keySet();
670            return set.toArray(new ObjectName[set.size()]);
671        }
672    
673        protected ObjectName[] getQueueProducers() {
674            Set<ObjectName> set = queueProducers.keySet();
675            return set.toArray(new ObjectName[set.size()]);
676        }
677    
678        protected ObjectName[] getTemporaryTopicProducers() {
679            Set<ObjectName> set = temporaryTopicProducers.keySet();
680            return set.toArray(new ObjectName[set.size()]);
681        }
682    
683        protected ObjectName[] getTemporaryQueueProducers() {
684            Set<ObjectName> set = temporaryQueueProducers.keySet();
685            return set.toArray(new ObjectName[set.size()]);
686        }
687    
688        protected ObjectName[] getDynamicDestinationProducers() {
689            Set<ObjectName> set = dynamicDestinationProducers.keySet();
690            return set.toArray(new ObjectName[set.size()]);
691        }
692    
693        public Broker getContextBroker() {
694            return contextBroker;
695        }
696    
697        public void setContextBroker(Broker contextBroker) {
698            this.contextBroker = contextBroker;
699        }
700    
701        protected ObjectName createObjectName(ActiveMQDestination destName) throws MalformedObjectNameException {
702            // Build the object name for the destination
703            Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
704            ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + "," + "Type="
705                                                   + JMXSupport.encodeObjectNamePart(destName.getDestinationTypeAsString()) + "," + "Destination="
706                                                   + JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
707            return objectName;
708        }
709    
710        protected ObjectName createObjectName(ProducerInfo producerInfo, String connectionClientId) throws MalformedObjectNameException {
711            // Build the object name for the producer info
712            Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
713    
714            String destinationType = "destinationType=";
715            String destinationName = "destinationName=";
716    
717            if (producerInfo.getDestination() == null) {
718                destinationType += "Dynamic";
719                destinationName = null;
720            } else {
721                destinationType += producerInfo.getDestination().getDestinationTypeAsString();
722                destinationName += JMXSupport.encodeObjectNamePart(producerInfo.getDestination().getPhysicalName());
723            }
724    
725            String clientId = "clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
726            String producerId = "producerId=" + JMXSupport.encodeObjectNamePart(producerInfo.getProducerId().toString());
727    
728            ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
729                                                   + "Type=Producer" + ","
730                                                   + destinationType + ","
731                                                   + (destinationName != null ? destinationName + "," : "")
732                                                   + clientId + "," + producerId);
733            return objectName;
734        }
735    
736        public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
737            ObjectName objectName = null;
738            try {
739                objectName = createObjectName(strategy);
740                if (!registeredMBeans.contains(objectName))  {
741                    AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
742                    AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
743                    registeredMBeans.add(objectName);
744                }
745            } catch (Exception e) {
746                LOG.warn("Failed to register MBean: " + strategy);
747                LOG.debug("Failure reason: " + e, e);
748            }
749            return objectName;
750        }
751    
752        protected ObjectName createObjectName(XATransaction transaction) throws MalformedObjectNameException {
753            Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
754            ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName")
755                                                   + "," + "Type=RecoveredXaTransaction"
756                                                   + "," + "Xid="
757                                                   + JMXSupport.encodeObjectNamePart(transaction.getTransactionId().toString()));
758            return objectName;
759        }
760    
761        public void registerRecoveredTransactionMBean(XATransaction transaction) {
762            try {
763                ObjectName objectName = createObjectName(transaction);
764                if (!registeredMBeans.contains(objectName))  {
765                    RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
766                    AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
767                    registeredMBeans.add(objectName);
768                }
769            } catch (Exception e) {
770                LOG.warn("Failed to register prepared transaction MBean: " + transaction);
771                LOG.debug("Failure reason: " + e, e);
772            }
773        }
774    
775        public void unregister(XATransaction transaction) {
776            try {
777                ObjectName objectName = createObjectName(transaction);
778                if (registeredMBeans.remove(objectName)) {
779                    try {
780                        managementContext.unregisterMBean(objectName);
781                    } catch (Throwable e) {
782                        LOG.warn("Failed to unregister MBean: " + objectName);
783                        LOG.debug("Failure reason: " + e, e);
784                    }
785                }
786            } catch (Exception e) {
787                LOG.warn("Failed to create object name to unregister " + transaction, e);
788            }
789        }
790    
791        private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
792            Hashtable<String, String> map = brokerObjectName.getKeyPropertyList();
793            ObjectName objectName = new ObjectName(brokerObjectName.getDomain() + ":" + "BrokerName=" + map.get("BrokerName") + ","
794                                + "Type=SlowConsumerStrategy," + "InstanceName=" + JMXSupport.encodeObjectNamePart(strategy.getName()));
795            return objectName;
796        }
797    
798        public ObjectName getSubscriberObjectName(Subscription key) {
799            return subscriptionMap.get(key);
800        }
801    
802        public Subscription getSubscriber(ObjectName key) {
803            Subscription sub = null;
804            for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
805                if (entry.getValue().equals(key)) {
806                    sub = entry.getKey();
807                    break;
808                }
809            }
810            return sub;
811        }
812    }