001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.plugin;
018
019import java.io.File;
020import java.net.URI;
021import java.util.Set;
022
023import javax.jms.JMSException;
024import javax.management.ObjectName;
025
026import org.apache.activemq.advisory.AdvisorySupport;
027import org.apache.activemq.broker.Broker;
028import org.apache.activemq.broker.BrokerFilter;
029import org.apache.activemq.broker.BrokerService;
030import org.apache.activemq.broker.ConnectionContext;
031import org.apache.activemq.broker.ProducerBrokerExchange;
032import org.apache.activemq.broker.jmx.BrokerViewMBean;
033import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
034import org.apache.activemq.broker.region.Destination;
035import org.apache.activemq.broker.region.DestinationStatistics;
036import org.apache.activemq.broker.region.RegionBroker;
037import org.apache.activemq.command.ActiveMQDestination;
038import org.apache.activemq.command.ActiveMQMapMessage;
039import org.apache.activemq.command.Message;
040import org.apache.activemq.command.MessageId;
041import org.apache.activemq.command.ProducerId;
042import org.apache.activemq.command.ProducerInfo;
043import org.apache.activemq.state.ProducerState;
044import org.apache.activemq.usage.SystemUsage;
045import org.apache.activemq.util.IdGenerator;
046import org.apache.activemq.util.LongSequenceGenerator;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049/**
050 * A StatisticsBroker You can retrieve a Map Message for a Destination - or
051 * Broker containing statistics as key-value pairs The message must contain a
052 * replyTo Destination - else its ignored
053 *
054 */
055public class StatisticsBroker extends BrokerFilter {
056    private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
057    static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
058    static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
059    static final String STATS_BROKER_RESET_HEADER = "ActiveMQ.Statistics.Broker.Reset";
060    static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription";
061    static final String STATS_DENOTE_END_LIST = STATS_DESTINATION_PREFIX + ".List.End.With.Null";
062    private static final IdGenerator ID_GENERATOR = new IdGenerator();
063    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
064    protected final ProducerId advisoryProducerId = new ProducerId();
065    protected BrokerViewMBean brokerView;
066
067    /**
068     *
069     * Constructor
070     *
071     * @param next
072     */
073    public StatisticsBroker(Broker next) {
074        super(next);
075        this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
076    }
077
078    /**
079     * Sets the persistence mode
080     *
081     * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange,
082     *      org.apache.activemq.command.Message)
083     */
084    @Override
085    public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
086        ActiveMQDestination msgDest = messageSend.getDestination();
087        ActiveMQDestination replyTo = messageSend.getReplyTo();
088        if (replyTo != null) {
089            String physicalName = msgDest.getPhysicalName();
090            boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0,
091                    STATS_DESTINATION_PREFIX.length());
092            boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
093                    .length());
094            boolean subStats = physicalName.regionMatches(true, 0, STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX
095                    .length());
096            BrokerService brokerService = getBrokerService();
097            RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
098            if (destStats) {
099                String destinationName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length());
100                if (destinationName.startsWith(".")) {
101                    destinationName = destinationName.substring(1);
102                }
103                String destinationQuery = destinationName.replace(STATS_DENOTE_END_LIST,"");
104                boolean endListMessage = !destinationName.equals(destinationQuery);
105                ActiveMQDestination queryDestination = ActiveMQDestination.createDestination(destinationQuery,msgDest.getDestinationType());
106                Set<Destination> destinations = getDestinations(queryDestination);
107
108                for (Destination dest : destinations) {
109                    DestinationStatistics stats = dest.getDestinationStatistics();
110                    if (stats != null) {
111                        ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
112                        statsMessage.setString("brokerName", regionBroker.getBrokerName());
113                        statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
114                        statsMessage.setString("destinationName", dest.getActiveMQDestination().toString());
115                        statsMessage.setLong("size", stats.getMessages().getCount());
116                        statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
117                        statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
118                        statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
119                        statsMessage.setLong("expiredCount", stats.getExpired().getCount());
120                        statsMessage.setLong("inflightCount", stats.getInflight().getCount());
121                        statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
122                        // we are okay with the size without decimals so cast to long
123                        statsMessage.setLong("averageMessageSize", (long) stats.getMessageSize().getAveragePerSecond());
124                        statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage());
125                        statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage());
126                        statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit());
127                        statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
128                        statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
129                        statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
130                        statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
131                        statsMessage.setLong("producerCount", stats.getProducers().getCount());
132                        statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
133                        sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
134                    }
135                }
136                if(endListMessage){
137                    ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
138                    statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
139                    sendStats(producerExchange.getConnectionContext(),statsMessage,replyTo);
140                }
141
142            } else if (subStats) {
143                sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo);
144                sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo);
145            } else if (brokerStats) {
146
147                if (messageSend.getProperties().containsKey(STATS_BROKER_RESET_HEADER)) {
148                    getBrokerView().resetStatistics();
149                }
150
151                ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
152                SystemUsage systemUsage = brokerService.getSystemUsage();
153                DestinationStatistics stats = regionBroker.getDestinationStatistics();
154                statsMessage.setString("brokerName", regionBroker.getBrokerName());
155                statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
156                statsMessage.setLong("size", stats.getMessages().getCount());
157                statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
158                statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
159                statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
160                statsMessage.setLong("expiredCount", stats.getExpired().getCount());
161                statsMessage.setLong("inflightCount", stats.getInflight().getCount());
162                // we are okay with the size without decimals so cast to long
163                statsMessage.setLong("averageMessageSize",(long) stats.getMessageSize().getAverageSize());
164                statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
165                statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage());
166                statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage());
167                statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit());
168                statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage());
169                statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage());
170                statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit());
171                statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage());
172                statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage());
173                statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit());
174                statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
175                statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
176                statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
177                statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
178                statsMessage.setLong("producerCount", stats.getProducers().getCount());
179                String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
180                answer = answer != null ? answer : "";
181                statsMessage.setString("openwire", answer);
182                answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
183                answer = answer != null ? answer : "";
184                statsMessage.setString("stomp", answer);
185                answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
186                answer = answer != null ? answer : "";
187                statsMessage.setString("ssl", answer);
188                answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
189                answer = answer != null ? answer : "";
190                statsMessage.setString("stomp+ssl", answer);
191                URI uri = brokerService.getVmConnectorURI();
192                answer = uri != null ? uri.toString() : "";
193                statsMessage.setString("vm", answer);
194                File file = brokerService.getDataDirectoryFile();
195                answer = file != null ? file.getCanonicalPath() : "";
196                statsMessage.setString("dataDirectory", answer);
197                statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
198                sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
199            } else {
200                super.send(producerExchange, messageSend);
201            }
202        } else {
203            super.send(producerExchange, messageSend);
204        }
205    }
206
207    BrokerViewMBean getBrokerView() throws Exception {
208        if (this.brokerView == null) {
209            ObjectName brokerName = getBrokerService().getBrokerObjectName();
210            this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName,
211                    BrokerViewMBean.class, true);
212        }
213        return this.brokerView;
214    }
215
216    @Override
217    public void start() throws Exception {
218        super.start();
219        LOG.info("Starting StatisticsBroker");
220    }
221
222    @Override
223    public void stop() throws Exception {
224        super.stop();
225    }
226
227    protected void sendSubStats(ConnectionContext context, ObjectName[] subscribers, ActiveMQDestination replyTo) throws Exception {
228        for (int i = 0; i < subscribers.length; i++) {
229            ObjectName name = subscribers[i];
230            SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true);
231            ActiveMQMapMessage statsMessage = prepareSubscriptionMessage(subscriber);
232            sendStats(context, statsMessage, replyTo);
233        }
234    }
235
236    protected ActiveMQMapMessage prepareSubscriptionMessage(SubscriptionViewMBean subscriber) throws JMSException {
237        Broker regionBroker = getBrokerService().getRegionBroker();
238        ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
239        statsMessage.setString("brokerName", regionBroker.getBrokerName());
240        statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
241        statsMessage.setString("destinationName", subscriber.getDestinationName());
242        statsMessage.setString("clientId", subscriber.getClientId());
243        statsMessage.setString("connectionId", subscriber.getConnectionId());
244        statsMessage.setLong("sessionId", subscriber.getSessionId());
245        statsMessage.setString("selector", subscriber.getSelector());
246        statsMessage.setLong("enqueueCounter", subscriber.getEnqueueCounter());
247        statsMessage.setLong("dequeueCounter", subscriber.getDequeueCounter());
248        statsMessage.setLong("dispatchedCounter", subscriber.getDispatchedCounter());
249        statsMessage.setLong("dispatchedQueueSize", subscriber.getDispatchedQueueSize());
250        statsMessage.setInt("prefetchSize", subscriber.getPrefetchSize());
251        statsMessage.setInt("maximumPendingMessageLimit", subscriber.getMaximumPendingMessageLimit());
252        statsMessage.setBoolean("exclusive", subscriber.isExclusive());
253        statsMessage.setBoolean("retroactive", subscriber.isRetroactive());
254        statsMessage.setBoolean("slowConsumer", subscriber.isSlowConsumer());
255        return statsMessage;
256    }
257
258    protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo)
259            throws Exception {
260        msg.setPersistent(false);
261        msg.setTimestamp(System.currentTimeMillis());
262        msg.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
263        msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
264        msg.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId()));
265        msg.setDestination(replyTo);
266        msg.setResponseRequired(false);
267        msg.setProducerId(this.advisoryProducerId);
268        boolean originalFlowControl = context.isProducerFlowControl();
269        final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
270        producerExchange.setConnectionContext(context);
271        producerExchange.setMutable(true);
272        producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
273        try {
274            context.setProducerFlowControl(false);
275            this.next.send(producerExchange, msg);
276        } finally {
277            context.setProducerFlowControl(originalFlowControl);
278        }
279    }
280
281}