001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import org.apache.activemq.broker.region.Subscription; 020import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; 021import org.apache.activemq.broker.region.policy.SlowConsumerEntry; 022import org.slf4j.Logger; 023import org.slf4j.LoggerFactory; 024 025import javax.management.ObjectName; 026import javax.management.openmbean.CompositeType; 027import javax.management.openmbean.OpenDataException; 028import javax.management.openmbean.TabularData; 029import javax.management.openmbean.TabularDataSupport; 030import javax.management.openmbean.TabularType; 031import java.util.Map; 032 033public class AbortSlowConsumerStrategyView implements AbortSlowConsumerStrategyViewMBean { 034 private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumerStrategyView.class); 035 private ManagedRegionBroker broker; 036 private AbortSlowConsumerStrategy strategy; 037 038 039 public AbortSlowConsumerStrategyView(ManagedRegionBroker managedRegionBroker, AbortSlowConsumerStrategy slowConsumerStrategy) { 040 this.broker = managedRegionBroker; 041 this.strategy = slowConsumerStrategy; 042 } 043 044 public long getMaxSlowCount() { 045 return strategy.getMaxSlowCount(); 046 } 047 048 public void setMaxSlowCount(long maxSlowCount) { 049 strategy.setMaxSlowCount(maxSlowCount); 050 } 051 052 public long getMaxSlowDuration() { 053 return strategy.getMaxSlowDuration(); 054 } 055 056 public void setMaxSlowDuration(long maxSlowDuration) { 057 strategy.setMaxSlowDuration(maxSlowDuration); 058 } 059 060 public long getCheckPeriod() { 061 return strategy.getCheckPeriod(); 062 } 063 064 public TabularData getSlowConsumers() throws OpenDataException { 065 066 OpenTypeSupport.OpenTypeFactory factory = OpenTypeSupport.getFactory(SlowConsumerEntry.class); 067 CompositeType ct = factory.getCompositeType(); 068 TabularType tt = new TabularType("SlowConsumers", "Table of current slow Consumers", ct, new String[] {"subscription" }); 069 TabularDataSupport rc = new TabularDataSupport(tt); 070 071 int index = 0; 072 Map<Subscription, SlowConsumerEntry> slowConsumers = strategy.getSlowConsumers(); 073 for (Map.Entry<Subscription, SlowConsumerEntry> entry : slowConsumers.entrySet()) { 074 entry.getValue().setSubscription(broker.getSubscriberObjectName(entry.getKey())); 075 rc.put(OpenTypeSupport.convert(entry.getValue())); 076 } 077 return rc; 078 } 079 080 public void abortConsumer(ObjectName consumerToAbort) { 081 Subscription sub = broker.getSubscriber(consumerToAbort); 082 if (sub != null) { 083 LOG.info("aborting consumer via jmx: {}", sub.getConsumerInfo().getConsumerId()); 084 strategy.abortConsumer(sub, false); 085 } else { 086 LOG.warn("cannot resolve subscription matching name: {}", consumerToAbort); 087 } 088 089 } 090 091 public void abortConnection(ObjectName consumerToAbort) { 092 Subscription sub = broker.getSubscriber(consumerToAbort); 093 if (sub != null) { 094 LOG.info("aborting consumer connection via jmx: {}", sub.getConsumerInfo().getConsumerId().getConnectionId()); 095 strategy.abortConsumer(sub, true); 096 } else { 097 LOG.warn("cannot resolve subscription matching name: {}", consumerToAbort); 098 } 099 } 100 101 public void abortConsumer(String objectNameOfConsumerToAbort) { 102 abortConsumer(toObjectName(objectNameOfConsumerToAbort)); 103 } 104 105 public void abortConnection(String objectNameOfConsumerToAbort) { 106 abortConnection(toObjectName(objectNameOfConsumerToAbort)); 107 } 108 109 private ObjectName toObjectName(String objectName) { 110 ObjectName result = null; 111 try { 112 result = new ObjectName(objectName); 113 } catch (Exception e) { 114 LOG.warn("cannot create subscription ObjectName to abort, from string: {}", objectName); 115 } 116 return result; 117 } 118}