001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.plugin;
018
019 import java.io.File;
020 import java.net.URI;
021 import java.util.Set;
022
023 import javax.jms.JMSException;
024 import javax.management.ObjectName;
025
026 import org.apache.activemq.advisory.AdvisorySupport;
027 import org.apache.activemq.broker.Broker;
028 import org.apache.activemq.broker.BrokerFilter;
029 import org.apache.activemq.broker.BrokerService;
030 import org.apache.activemq.broker.ConnectionContext;
031 import org.apache.activemq.broker.ProducerBrokerExchange;
032 import org.apache.activemq.broker.jmx.BrokerViewMBean;
033 import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
034 import org.apache.activemq.broker.region.Destination;
035 import org.apache.activemq.broker.region.DestinationStatistics;
036 import org.apache.activemq.broker.region.RegionBroker;
037 import org.apache.activemq.command.ActiveMQDestination;
038 import org.apache.activemq.command.ActiveMQMapMessage;
039 import org.apache.activemq.command.Message;
040 import org.apache.activemq.command.MessageId;
041 import org.apache.activemq.command.ProducerId;
042 import org.apache.activemq.command.ProducerInfo;
043 import org.apache.activemq.state.ProducerState;
044 import org.apache.activemq.usage.SystemUsage;
045 import org.apache.activemq.util.IdGenerator;
046 import org.apache.activemq.util.LongSequenceGenerator;
047 import org.slf4j.Logger;
048 import org.slf4j.LoggerFactory;
049 /**
050 * A StatisticsBroker You can retrieve a Map Message for a Destination - or
051 * Broker containing statistics as key-value pairs The message must contain a
052 * replyTo Destination - else its ignored
053 *
054 */
055 public class StatisticsBroker extends BrokerFilter {
056 private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
057 static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
058 static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
059 static final String STATS_BROKER_RESET_HEADER = "ActiveMQ.Statistics.Broker.Reset";
060 static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription";
061 private static final IdGenerator ID_GENERATOR = new IdGenerator();
062 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
063 protected final ProducerId advisoryProducerId = new ProducerId();
064 protected BrokerViewMBean brokerView;
065
066 /**
067 *
068 * Constructor
069 *
070 * @param next
071 */
072 public StatisticsBroker(Broker next) {
073 super(next);
074 this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
075 }
076
077 /**
078 * Sets the persistence mode
079 *
080 * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange,
081 * org.apache.activemq.command.Message)
082 */
083 public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
084 ActiveMQDestination msgDest = messageSend.getDestination();
085 ActiveMQDestination replyTo = messageSend.getReplyTo();
086 if (replyTo != null) {
087 String physicalName = msgDest.getPhysicalName();
088 boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0,
089 STATS_DESTINATION_PREFIX.length());
090 boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
091 .length());
092 boolean subStats = physicalName.regionMatches(true, 0, STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX
093 .length());
094 BrokerService brokerService = getBrokerService();
095 RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
096 if (destStats) {
097 String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length());
098 ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType());
099 Set<Destination> set = getDestinations(queryDest);
100 for (Destination dest : set) {
101 DestinationStatistics stats = dest.getDestinationStatistics();
102 if (stats != null) {
103 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
104 statsMessage.setString("destinationName", dest.getActiveMQDestination().toString());
105 statsMessage.setLong("size", stats.getMessages().getCount());
106 statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
107 statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
108 statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
109 statsMessage.setLong("expiredCount", stats.getExpired().getCount());
110 statsMessage.setLong("inflightCount", stats.getInflight().getCount());
111 statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
112 statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage());
113 statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage());
114 statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit());
115 statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
116 statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
117 statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
118 statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
119 statsMessage.setLong("producerCount", stats.getProducers().getCount());
120 statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
121 sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
122 }
123 }
124 } else if (subStats) {
125 sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo);
126 sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo);
127 } else if (brokerStats) {
128
129 if (messageSend.getProperties().containsKey(STATS_BROKER_RESET_HEADER)) {
130 getBrokerView().resetStatistics();
131 }
132
133 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
134 SystemUsage systemUsage = brokerService.getSystemUsage();
135 DestinationStatistics stats = regionBroker.getDestinationStatistics();
136 statsMessage.setString("brokerName", regionBroker.getBrokerName());
137 statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
138 statsMessage.setLong("size", stats.getMessages().getCount());
139 statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
140 statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
141 statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
142 statsMessage.setLong("expiredCount", stats.getExpired().getCount());
143 statsMessage.setLong("inflightCount", stats.getInflight().getCount());
144 statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
145 statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage());
146 statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage());
147 statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit());
148 statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage());
149 statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage());
150 statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit());
151 statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage());
152 statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage());
153 statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit());
154 statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
155 statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
156 statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
157 statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
158 statsMessage.setLong("producerCount", stats.getProducers().getCount());
159 String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
160 answer = answer != null ? answer : "";
161 statsMessage.setString("openwire", answer);
162 answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
163 answer = answer != null ? answer : "";
164 statsMessage.setString("stomp", answer);
165 answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
166 answer = answer != null ? answer : "";
167 statsMessage.setString("ssl", answer);
168 answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
169 answer = answer != null ? answer : "";
170 statsMessage.setString("stomp+ssl", answer);
171 URI uri = brokerService.getVmConnectorURI();
172 answer = uri != null ? uri.toString() : "";
173 statsMessage.setString("vm", answer);
174 File file = brokerService.getDataDirectoryFile();
175 answer = file != null ? file.getCanonicalPath() : "";
176 statsMessage.setString("dataDirectory", answer);
177 statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
178 sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
179 } else {
180 super.send(producerExchange, messageSend);
181 }
182 } else {
183 super.send(producerExchange, messageSend);
184 }
185 }
186
187 BrokerViewMBean getBrokerView() throws Exception {
188 if (this.brokerView == null) {
189 ObjectName brokerName = getBrokerService().getBrokerObjectName();
190 this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName,
191 BrokerViewMBean.class, true);
192 }
193 return this.brokerView;
194 }
195
196 public void start() throws Exception {
197 super.start();
198 LOG.info("Starting StatisticsBroker");
199 }
200
201 public void stop() throws Exception {
202 super.stop();
203 }
204
205 protected void sendSubStats(ConnectionContext context, ObjectName[] subscribers, ActiveMQDestination replyTo) throws Exception {
206 for (int i = 0; i < subscribers.length; i++) {
207 ObjectName name = subscribers[i];
208 SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true);
209 ActiveMQMapMessage statsMessage = prepareSubscriptionMessage(subscriber);
210 sendStats(context, statsMessage, replyTo);
211 }
212 }
213
214 protected ActiveMQMapMessage prepareSubscriptionMessage(SubscriptionViewMBean subscriber) throws JMSException {
215 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
216 statsMessage.setString("destinationName", subscriber.getDestinationName());
217 statsMessage.setString("clientId", subscriber.getClientId());
218 statsMessage.setString("connectionId", subscriber.getConnectionId());
219 statsMessage.setLong("sessionId", subscriber.getSessionId());
220 statsMessage.setString("selector", subscriber.getSelector());
221 statsMessage.setLong("enqueueCounter", subscriber.getEnqueueCounter());
222 statsMessage.setLong("dequeueCounter", subscriber.getDequeueCounter());
223 statsMessage.setLong("dispatchedCounter", subscriber.getDispatchedCounter());
224 statsMessage.setLong("dispatchedQueueSize", subscriber.getDispatchedQueueSize());
225 statsMessage.setInt("prefetchSize", subscriber.getPrefetchSize());
226 statsMessage.setInt("maximumPendingMessageLimit", subscriber.getMaximumPendingMessageLimit());
227 statsMessage.setBoolean("exclusive", subscriber.isExclusive());
228 statsMessage.setBoolean("retroactive", subscriber.isRetroactive());
229 statsMessage.setBoolean("slowConsumer", subscriber.isSlowConsumer());
230 return statsMessage;
231 }
232
233 protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo)
234 throws Exception {
235 msg.setPersistent(false);
236 msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
237 msg.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId()));
238 msg.setDestination(replyTo);
239 msg.setResponseRequired(false);
240 msg.setProducerId(this.advisoryProducerId);
241 boolean originalFlowControl = context.isProducerFlowControl();
242 final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
243 producerExchange.setConnectionContext(context);
244 producerExchange.setMutable(true);
245 producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
246 try {
247 context.setProducerFlowControl(false);
248 this.next.send(producerExchange, msg);
249 } finally {
250 context.setProducerFlowControl(originalFlowControl);
251 }
252 }
253 }