001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.plugin;
018    
019    import java.io.File;
020    import java.net.URI;
021    import java.util.Set;
022    
023    import javax.jms.JMSException;
024    import javax.management.ObjectName;
025    import org.apache.activemq.advisory.AdvisorySupport;
026    import org.apache.activemq.broker.Broker;
027    import org.apache.activemq.broker.BrokerFilter;
028    import org.apache.activemq.broker.BrokerService;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.broker.ProducerBrokerExchange;
031    import org.apache.activemq.broker.jmx.BrokerViewMBean;
032    import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
033    import org.apache.activemq.broker.region.Destination;
034    import org.apache.activemq.broker.region.DestinationStatistics;
035    import org.apache.activemq.broker.region.RegionBroker;
036    import org.apache.activemq.command.ActiveMQDestination;
037    import org.apache.activemq.command.ActiveMQMapMessage;
038    import org.apache.activemq.command.Message;
039    import org.apache.activemq.command.MessageId;
040    import org.apache.activemq.command.ProducerId;
041    import org.apache.activemq.command.ProducerInfo;
042    import org.apache.activemq.state.ProducerState;
043    import org.apache.activemq.usage.SystemUsage;
044    import org.apache.activemq.util.IdGenerator;
045    import org.apache.activemq.util.LongSequenceGenerator;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    /**
049     * A StatisticsBroker You can retrieve a Map Message for a Destination - or
050     * Broker containing statistics as key-value pairs The message must contain a
051     * replyTo Destination - else its ignored
052     *
053     */
054    public class StatisticsBroker extends BrokerFilter {
055        private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
056        static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
057        static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
058        static final String STATS_BROKER_RESET_HEADER = "ActiveMQ.Statistics.Broker.Reset";
059        static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription";
060        static final String STATS_DENOTE_END_LIST = STATS_DESTINATION_PREFIX + ".List.End.With.Null";
061        private static final IdGenerator ID_GENERATOR = new IdGenerator();
062        private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
063        protected final ProducerId advisoryProducerId = new ProducerId();
064        protected BrokerViewMBean brokerView;
065    
066        /**
067         *
068         * Constructor
069         *
070         * @param next
071         */
072        public StatisticsBroker(Broker next) {
073            super(next);
074            this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
075        }
076    
077        /**
078         * Sets the persistence mode
079         *
080         * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange,
081         *      org.apache.activemq.command.Message)
082         */
083        public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
084            ActiveMQDestination msgDest = messageSend.getDestination();
085            ActiveMQDestination replyTo = messageSend.getReplyTo();
086            if (replyTo != null) {
087                String physicalName = msgDest.getPhysicalName();
088                boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0,
089                        STATS_DESTINATION_PREFIX.length());
090                boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
091                        .length());
092                boolean subStats = physicalName.regionMatches(true, 0, STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX
093                        .length());
094                BrokerService brokerService = getBrokerService();
095                RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
096                if (destStats) {
097                    String destinationName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length());
098                    String destinationQuery = destinationName.replace(STATS_DENOTE_END_LIST,"");
099                    boolean endListMessage = !destinationName.equals(destinationQuery);
100                    ActiveMQDestination queryDestination = ActiveMQDestination.createDestination(destinationQuery,msgDest.getDestinationType());
101                    Set<Destination> destinations = getDestinations(queryDestination);
102    
103                    for (Destination dest : destinations) {
104                        DestinationStatistics stats = dest.getDestinationStatistics();
105                        if (stats != null) {
106                            ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
107                            statsMessage.setString("destinationName", dest.getActiveMQDestination().toString());
108                            statsMessage.setLong("size", stats.getMessages().getCount());
109                            statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
110                            statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
111                            statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
112                            statsMessage.setLong("expiredCount", stats.getExpired().getCount());
113                            statsMessage.setLong("inflightCount", stats.getInflight().getCount());
114                            statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
115                            statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage());
116                            statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage());
117                            statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit());
118                            statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
119                            statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
120                            statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
121                            statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
122                            statsMessage.setLong("producerCount", stats.getProducers().getCount());
123                            statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
124                            sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
125                        }
126                    }
127                    if(endListMessage){
128                        ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
129                        statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
130                        sendStats(producerExchange.getConnectionContext(),statsMessage,replyTo);
131                    }
132    
133                } else if (subStats) {
134                    sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo);
135                    sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo);
136                } else if (brokerStats) {
137    
138                    if (messageSend.getProperties().containsKey(STATS_BROKER_RESET_HEADER)) {
139                        getBrokerView().resetStatistics();
140                    }
141    
142                    ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
143                    SystemUsage systemUsage = brokerService.getSystemUsage();
144                    DestinationStatistics stats = regionBroker.getDestinationStatistics();
145                    statsMessage.setString("brokerName", regionBroker.getBrokerName());
146                    statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
147                    statsMessage.setLong("size", stats.getMessages().getCount());
148                    statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
149                    statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
150                    statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
151                    statsMessage.setLong("expiredCount", stats.getExpired().getCount());
152                    statsMessage.setLong("inflightCount", stats.getInflight().getCount());
153                    statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
154                    statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage());
155                    statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage());
156                    statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit());
157                    statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage());
158                    statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage());
159                    statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit());
160                    statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage());
161                    statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage());
162                    statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit());
163                    statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
164                    statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
165                    statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
166                    statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
167                    statsMessage.setLong("producerCount", stats.getProducers().getCount());
168                    String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
169                    answer = answer != null ? answer : "";
170                    statsMessage.setString("openwire", answer);
171                    answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
172                    answer = answer != null ? answer : "";
173                    statsMessage.setString("stomp", answer);
174                    answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
175                    answer = answer != null ? answer : "";
176                    statsMessage.setString("ssl", answer);
177                    answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
178                    answer = answer != null ? answer : "";
179                    statsMessage.setString("stomp+ssl", answer);
180                    URI uri = brokerService.getVmConnectorURI();
181                    answer = uri != null ? uri.toString() : "";
182                    statsMessage.setString("vm", answer);
183                    File file = brokerService.getDataDirectoryFile();
184                    answer = file != null ? file.getCanonicalPath() : "";
185                    statsMessage.setString("dataDirectory", answer);
186                    statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
187                    sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
188                } else {
189                    super.send(producerExchange, messageSend);
190                }
191            } else {
192                super.send(producerExchange, messageSend);
193            }
194        }
195    
196        BrokerViewMBean getBrokerView() throws Exception {
197            if (this.brokerView == null) {
198                ObjectName brokerName = getBrokerService().getBrokerObjectName();
199                this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName,
200                        BrokerViewMBean.class, true);
201            }
202            return this.brokerView;
203        }
204    
205        public void start() throws Exception {
206            super.start();
207            LOG.info("Starting StatisticsBroker");
208        }
209    
210        public void stop() throws Exception {
211            super.stop();
212        }
213    
214        protected void sendSubStats(ConnectionContext context, ObjectName[] subscribers, ActiveMQDestination replyTo) throws Exception {
215            for (int i = 0; i < subscribers.length; i++) {
216                ObjectName name = subscribers[i];
217                SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true);
218                ActiveMQMapMessage statsMessage = prepareSubscriptionMessage(subscriber);
219                sendStats(context, statsMessage, replyTo);
220            }
221        }
222    
223        protected ActiveMQMapMessage prepareSubscriptionMessage(SubscriptionViewMBean subscriber) throws JMSException {
224            ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
225            statsMessage.setString("destinationName", subscriber.getDestinationName());
226            statsMessage.setString("clientId", subscriber.getClientId());
227            statsMessage.setString("connectionId", subscriber.getConnectionId());
228            statsMessage.setLong("sessionId", subscriber.getSessionId());
229            statsMessage.setString("selector", subscriber.getSelector());
230            statsMessage.setLong("enqueueCounter", subscriber.getEnqueueCounter());
231            statsMessage.setLong("dequeueCounter", subscriber.getDequeueCounter());
232            statsMessage.setLong("dispatchedCounter", subscriber.getDispatchedCounter());
233            statsMessage.setLong("dispatchedQueueSize", subscriber.getDispatchedQueueSize());
234            statsMessage.setInt("prefetchSize", subscriber.getPrefetchSize());
235            statsMessage.setInt("maximumPendingMessageLimit", subscriber.getMaximumPendingMessageLimit());
236            statsMessage.setBoolean("exclusive", subscriber.isExclusive());
237            statsMessage.setBoolean("retroactive", subscriber.isRetroactive());
238            statsMessage.setBoolean("slowConsumer", subscriber.isSlowConsumer());
239            return statsMessage;
240        }
241    
242        protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo)
243                throws Exception {
244            msg.setPersistent(false);
245            msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
246            msg.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId()));
247            msg.setDestination(replyTo);
248            msg.setResponseRequired(false);
249            msg.setProducerId(this.advisoryProducerId);
250            boolean originalFlowControl = context.isProducerFlowControl();
251            final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
252            producerExchange.setConnectionContext(context);
253            producerExchange.setMutable(true);
254            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
255            try {
256                context.setProducerFlowControl(false);
257                this.next.send(producerExchange, msg);
258            } finally {
259                context.setProducerFlowControl(originalFlowControl);
260            }
261        }
262    
263    }