001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.lang.reflect.InvocationTargetException;
022    import java.lang.reflect.Method;
023    import java.net.MalformedURLException;
024    import java.net.URI;
025    import java.net.URL;
026    import java.util.HashMap;
027    import java.util.Map;
028    import java.util.NoSuchElementException;
029    import java.util.concurrent.atomic.AtomicInteger;
030    import javax.management.ObjectName;
031    
032    import org.apache.activemq.ActiveMQConnectionMetaData;
033    import org.apache.activemq.broker.BrokerService;
034    import org.apache.activemq.broker.ConnectionContext;
035    import org.apache.activemq.broker.TransportConnector;
036    import org.apache.activemq.broker.region.Subscription;
037    import org.apache.activemq.command.ActiveMQQueue;
038    import org.apache.activemq.command.ActiveMQTopic;
039    import org.apache.activemq.command.ConsumerId;
040    import org.apache.activemq.command.ConsumerInfo;
041    import org.apache.activemq.command.RemoveSubscriptionInfo;
042    import org.apache.activemq.network.NetworkConnector;
043    import org.apache.activemq.util.BrokerSupport;
044    import org.slf4j.Logger;
045    import org.slf4j.LoggerFactory;
046    
047    /**
048     *
049     */
050    public class BrokerView implements BrokerViewMBean {
051        private static final Logger LOG = LoggerFactory.getLogger(BrokerView.class);
052        ManagedRegionBroker broker;
053        private final BrokerService brokerService;
054        private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
055        private ObjectName jmsJobScheduler;
056    
057        public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception {
058            this.brokerService = brokerService;
059            this.broker = managedBroker;
060        }
061    
062        public ManagedRegionBroker getBroker() {
063            return broker;
064        }
065    
066        public void setBroker(ManagedRegionBroker broker) {
067            this.broker = broker;
068        }
069    
070        public String getBrokerId() {
071            return safeGetBroker().getBrokerId().toString();
072        }
073    
074        public String getBrokerName() {
075            return safeGetBroker().getBrokerName();
076        }
077    
078        public String getBrokerVersion() {
079            return ActiveMQConnectionMetaData.PROVIDER_VERSION;
080        }
081    
082        @Override
083        public String getUptime() {
084            return brokerService.getUptime();
085        }
086    
087        public void gc() throws Exception {
088            brokerService.getBroker().gc();
089            try {
090                brokerService.getPersistenceAdapter().checkpoint(true);
091            } catch (IOException e) {
092                LOG.error("Failed to checkpoint persistence adapter on gc request, reason:" + e, e);
093            }
094        }
095    
096        public void start() throws Exception {
097            brokerService.start();
098        }
099    
100        public void stop() throws Exception {
101            brokerService.stop();
102        }
103    
104        public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
105                throws Exception {
106            brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval);
107        }
108    
109        public long getTotalEnqueueCount() {
110            return safeGetBroker().getDestinationStatistics().getEnqueues().getCount();
111        }
112    
113        public long getTotalDequeueCount() {
114            return safeGetBroker().getDestinationStatistics().getDequeues().getCount();
115        }
116    
117        public long getTotalConsumerCount() {
118            return safeGetBroker().getDestinationStatistics().getConsumers().getCount();
119        }
120    
121        public long getTotalProducerCount() {
122            return safeGetBroker().getDestinationStatistics().getProducers().getCount();
123        }
124    
125        public long getTotalMessageCount() {
126            return safeGetBroker().getDestinationStatistics().getMessages().getCount();
127        }
128    
129        public long getTotalMessagesCached() {
130            return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount();
131        }
132    
133        public int getMemoryPercentUsage() {
134            return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage();
135        }
136    
137        public long getMemoryLimit() {
138            return brokerService.getSystemUsage().getMemoryUsage().getLimit();
139        }
140    
141        public void setMemoryLimit(long limit) {
142            brokerService.getSystemUsage().getMemoryUsage().setLimit(limit);
143        }
144    
145        public long getStoreLimit() {
146            return brokerService.getSystemUsage().getStoreUsage().getLimit();
147        }
148    
149        public int getStorePercentUsage() {
150            return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
151        }
152    
153        public long getTempLimit() {
154           return brokerService.getSystemUsage().getTempUsage().getLimit();
155        }
156    
157        public int getTempPercentUsage() {
158           return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
159        }
160    
161        public void setStoreLimit(long limit) {
162            brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
163        }
164    
165        public void setTempLimit(long limit) {
166            brokerService.getSystemUsage().getTempUsage().setLimit(limit);
167        }
168    
169        public void resetStatistics() {
170            safeGetBroker().getDestinationStatistics().reset();
171        }
172    
173        public void enableStatistics() {
174            safeGetBroker().getDestinationStatistics().setEnabled(true);
175        }
176    
177        public void disableStatistics() {
178            safeGetBroker().getDestinationStatistics().setEnabled(false);
179        }
180    
181        public boolean isStatisticsEnabled() {
182            return safeGetBroker().getDestinationStatistics().isEnabled();
183        }
184    
185        public boolean isPersistent() {
186            return brokerService.isPersistent();
187        }
188    
189        public boolean isSlave() {
190            return brokerService.isSlave();
191        }
192    
193        public void terminateJVM(int exitCode) {
194            System.exit(exitCode);
195        }
196    
197        public ObjectName[] getTopics() {
198            return safeGetBroker().getTopics();
199        }
200    
201        public ObjectName[] getQueues() {
202            return safeGetBroker().getQueues();
203        }
204    
205        public ObjectName[] getTemporaryTopics() {
206            return safeGetBroker().getTemporaryTopics();
207        }
208    
209        public ObjectName[] getTemporaryQueues() {
210            return safeGetBroker().getTemporaryQueues();
211        }
212    
213        public ObjectName[] getTopicSubscribers() {
214            return safeGetBroker().getTopicSubscribers();
215        }
216    
217        public ObjectName[] getDurableTopicSubscribers() {
218            return safeGetBroker().getDurableTopicSubscribers();
219        }
220    
221        public ObjectName[] getQueueSubscribers() {
222            return safeGetBroker().getQueueSubscribers();
223        }
224    
225        public ObjectName[] getTemporaryTopicSubscribers() {
226            return safeGetBroker().getTemporaryTopicSubscribers();
227        }
228    
229        public ObjectName[] getTemporaryQueueSubscribers() {
230            return safeGetBroker().getTemporaryQueueSubscribers();
231        }
232    
233        public ObjectName[] getInactiveDurableTopicSubscribers() {
234            return safeGetBroker().getInactiveDurableTopicSubscribers();
235        }
236    
237        public ObjectName[] getTopicProducers() {
238            return safeGetBroker().getTopicProducers();
239        }
240    
241        public ObjectName[] getQueueProducers() {
242            return safeGetBroker().getQueueProducers();
243        }
244    
245        public ObjectName[] getTemporaryTopicProducers() {
246            return safeGetBroker().getTemporaryTopicProducers();
247        }
248    
249        public ObjectName[] getTemporaryQueueProducers() {
250            return safeGetBroker().getTemporaryQueueProducers();
251        }
252    
253        public ObjectName[] getDynamicDestinationProducers() {
254            return safeGetBroker().getDynamicDestinationProducers();
255        }
256    
257        public String addConnector(String discoveryAddress) throws Exception {
258            TransportConnector connector = brokerService.addConnector(discoveryAddress);
259            if (connector == null) {
260                throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
261            }
262            connector.start();
263            return connector.getName();
264        }
265    
266        public String addNetworkConnector(String discoveryAddress) throws Exception {
267            NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress);
268            if (connector == null) {
269                throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
270            }
271            connector.start();
272            return connector.getName();
273        }
274    
275        public boolean removeConnector(String connectorName) throws Exception {
276            TransportConnector connector = brokerService.getConnectorByName(connectorName);
277            if (connector == null) {
278                throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
279            }
280            connector.stop();
281            return brokerService.removeConnector(connector);
282        }
283    
284        public boolean removeNetworkConnector(String connectorName) throws Exception {
285            NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName);
286            if (connector == null) {
287                throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
288            }
289            connector.stop();
290            return brokerService.removeNetworkConnector(connector);
291        }
292    
293        public void addTopic(String name) throws Exception {
294            safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name),true);
295        }
296    
297        public void addQueue(String name) throws Exception {
298            safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name),true);
299        }
300    
301        public void removeTopic(String name) throws Exception {
302            safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), 1000);
303        }
304    
305        public void removeQueue(String name) throws Exception {
306            safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), 1000);
307        }
308    
309        public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName,
310                                                  String selector) throws Exception {
311            ConnectionContext context = new ConnectionContext();
312            context.setBroker(safeGetBroker());
313            context.setClientId(clientId);
314            ConsumerInfo info = new ConsumerInfo();
315            ConsumerId consumerId = new ConsumerId();
316            consumerId.setConnectionId(clientId);
317            consumerId.setSessionId(sessionIdCounter.incrementAndGet());
318            consumerId.setValue(0);
319            info.setConsumerId(consumerId);
320            info.setDestination(new ActiveMQTopic(topicName));
321            info.setSubscriptionName(subscriberName);
322            info.setSelector(selector);
323            Subscription subscription = safeGetBroker().addConsumer(context, info);
324            safeGetBroker().removeConsumer(context, info);
325            if (subscription != null) {
326                return subscription.getObjectName();
327            }
328            return null;
329        }
330    
331        public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception {
332            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
333            info.setClientId(clientId);
334            info.setSubscriptionName(subscriberName);
335            ConnectionContext context = new ConnectionContext();
336            context.setBroker(safeGetBroker());
337            context.setClientId(clientId);
338            safeGetBroker().removeSubscription(context, info);
339        }
340    
341        //  doc comment inherited from BrokerViewMBean
342        public void reloadLog4jProperties() throws Throwable {
343    
344            // Avoid a direct dependency on log4j.. use reflection.
345            try {
346                ClassLoader cl = getClass().getClassLoader();
347                Class<?> logManagerClass = cl.loadClass("org.apache.log4j.LogManager");
348    
349                Method resetConfiguration = logManagerClass.getMethod("resetConfiguration", new Class[]{});
350                resetConfiguration.invoke(null, new Object[]{});
351    
352                String configurationOptionStr = System.getProperty("log4j.configuration");
353                URL log4jprops = null;
354                if (configurationOptionStr != null) {
355                    try {
356                        log4jprops = new URL(configurationOptionStr);
357                    } catch (MalformedURLException ex) {
358                        log4jprops = cl.getResource("log4j.properties");
359                    }
360                } else {
361                   log4jprops = cl.getResource("log4j.properties");
362                }
363    
364                if (log4jprops != null) {
365                    Class<?> propertyConfiguratorClass = cl.loadClass("org.apache.log4j.PropertyConfigurator");
366                    Method configure = propertyConfiguratorClass.getMethod("configure", new Class[]{URL.class});
367                    configure.invoke(null, new Object[]{log4jprops});
368                }
369            } catch (InvocationTargetException e) {
370                throw e.getTargetException();
371            }
372        }
373    
374        public  Map<String, String> getTransportConnectors() {
375            Map<String, String> answer = new HashMap<String, String>();
376            try {
377                for (TransportConnector connector : brokerService.getTransportConnectors()) {
378                    answer.put(connector.getName(), connector.getConnectUri().toString());
379                }
380            } catch (Exception e) {
381                LOG.debug("Failed to read URI to build transport connectors map", e);
382            }
383            return answer;
384        }
385    
386        @Override
387        public String getTransportConnectorByType(String type) {
388            return brokerService.getTransportConnectorURIsAsMap().get(type);
389        }
390    
391        @Deprecated
392        /**
393         * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
394         */
395        public String getOpenWireURL() {
396            String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
397            return answer != null ? answer : "";
398        }
399    
400        @Deprecated
401        /**
402         * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
403         */
404        public String getStompURL() {
405            String answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
406            return answer != null ? answer : "";
407        }
408    
409        @Deprecated
410        /**
411         * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
412         */
413        public String getSslURL() {
414            String answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
415            return answer != null ? answer : "";
416        }
417    
418        @Deprecated
419        /**
420         * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
421         */
422        public String getStompSslURL() {
423            String answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
424            return answer != null ? answer : "";
425        }
426    
427        public String getVMURL() {
428            URI answer = brokerService.getVmConnectorURI();
429            return answer != null ? answer.toString() : "";
430        }
431    
432        public String getDataDirectory() {
433            File file = brokerService.getDataDirectoryFile();
434            try {
435                return file != null ? file.getCanonicalPath():"";
436            } catch (IOException e) {
437                return "";
438            }
439        }
440    
441        public ObjectName getJMSJobScheduler() {
442            return this.jmsJobScheduler;
443        }
444    
445        public void setJMSJobScheduler(ObjectName name) {
446            this.jmsJobScheduler=name;
447        }
448    
449        private ManagedRegionBroker safeGetBroker() {
450            if (broker == null) {
451                throw new IllegalStateException("Broker is not yet started.");
452            }
453    
454            return broker;
455        }
456    }