001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.jmx;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.lang.reflect.InvocationTargetException;
022    import java.lang.reflect.Method;
023    import java.net.MalformedURLException;
024    import java.net.URI;
025    import java.net.URL;
026    import java.util.HashMap;
027    import java.util.Map;
028    import java.util.NoSuchElementException;
029    import java.util.concurrent.atomic.AtomicInteger;
030    import javax.management.ObjectName;
031    
032    import org.apache.activemq.ActiveMQConnectionMetaData;
033    import org.apache.activemq.broker.BrokerService;
034    import org.apache.activemq.broker.ConnectionContext;
035    import org.apache.activemq.broker.TransportConnector;
036    import org.apache.activemq.broker.region.Subscription;
037    import org.apache.activemq.command.ActiveMQQueue;
038    import org.apache.activemq.command.ActiveMQTopic;
039    import org.apache.activemq.command.ConsumerId;
040    import org.apache.activemq.command.ConsumerInfo;
041    import org.apache.activemq.command.RemoveSubscriptionInfo;
042    import org.apache.activemq.network.NetworkConnector;
043    import org.apache.activemq.util.BrokerSupport;
044    import org.slf4j.Logger;
045    import org.slf4j.LoggerFactory;
046    
047    /**
048     *
049     */
050    public class BrokerView implements BrokerViewMBean {
051        private static final Logger LOG = LoggerFactory.getLogger(BrokerView.class);
052        ManagedRegionBroker broker;
053        private final BrokerService brokerService;
054        private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
055        private ObjectName jmsJobScheduler;
056    
057        public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception {
058            this.brokerService = brokerService;
059            this.broker = managedBroker;
060        }
061    
062        public ManagedRegionBroker getBroker() {
063            return broker;
064        }
065    
066        public void setBroker(ManagedRegionBroker broker) {
067            this.broker = broker;
068        }
069    
070        public String getBrokerId() {
071            return safeGetBroker().getBrokerId().toString();
072        }
073    
074        public String getBrokerName() {
075            return safeGetBroker().getBrokerName();
076        }
077    
078        public String getBrokerVersion() {
079            return ActiveMQConnectionMetaData.PROVIDER_VERSION;
080        }
081    
082        @Override
083        public String getUptime() {
084            return brokerService.getUptime();
085        }
086    
087        public void gc() throws Exception {
088            brokerService.getBroker().gc();
089            try {
090                brokerService.getPersistenceAdapter().checkpoint(true);
091            } catch (IOException e) {
092                LOG.error("Failed to checkpoint persistence adapter on gc request, reason:" + e, e);
093            }
094        }
095    
096        public void start() throws Exception {
097            brokerService.start();
098        }
099    
100        public void stop() throws Exception {
101            brokerService.stop();
102        }
103    
104        public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
105                throws Exception {
106            brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval);
107        }
108    
109        public long getTotalEnqueueCount() {
110            return safeGetBroker().getDestinationStatistics().getEnqueues().getCount();
111        }
112    
113        public long getTotalDequeueCount() {
114            return safeGetBroker().getDestinationStatistics().getDequeues().getCount();
115        }
116    
117        public long getTotalConsumerCount() {
118            return safeGetBroker().getDestinationStatistics().getConsumers().getCount();
119        }
120    
121        public long getTotalProducerCount() {
122            return safeGetBroker().getDestinationStatistics().getProducers().getCount();
123        }
124    
125        public long getTotalMessageCount() {
126            return safeGetBroker().getDestinationStatistics().getMessages().getCount();
127        }
128    
129        public long getTotalMessagesCached() {
130            return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount();
131        }
132    
133        public int getMemoryPercentUsage() {
134            return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage();
135        }
136    
137        public long getMemoryLimit() {
138            return brokerService.getSystemUsage().getMemoryUsage().getLimit();
139        }
140    
141        public void setMemoryLimit(long limit) {
142            brokerService.getSystemUsage().getMemoryUsage().setLimit(limit);
143        }
144    
145        public long getStoreLimit() {
146            return brokerService.getSystemUsage().getStoreUsage().getLimit();
147        }
148    
149        public int getStorePercentUsage() {
150            return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
151        }
152    
153        public long getTempLimit() {
154           return brokerService.getSystemUsage().getTempUsage().getLimit();
155        }
156    
157        public int getTempPercentUsage() {
158           return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
159        }
160        
161        public long getJobSchedulerStoreLimit() {
162            return brokerService.getSystemUsage().getJobSchedulerUsage().getLimit();
163        }
164        
165        public int getJobSchedulerStorePercentUsage() {
166            return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage();
167        }
168    
169        public void setStoreLimit(long limit) {
170            brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
171        }
172    
173        public void setTempLimit(long limit) {
174            brokerService.getSystemUsage().getTempUsage().setLimit(limit);
175        }
176        
177        public void setJobSchedulerStoreLimit(long limit) {
178            brokerService.getSystemUsage().getJobSchedulerUsage().setLimit(limit);
179        }
180    
181        public void resetStatistics() {
182            safeGetBroker().getDestinationStatistics().reset();
183        }
184    
185        public void enableStatistics() {
186            safeGetBroker().getDestinationStatistics().setEnabled(true);
187        }
188    
189        public void disableStatistics() {
190            safeGetBroker().getDestinationStatistics().setEnabled(false);
191        }
192    
193        public boolean isStatisticsEnabled() {
194            return safeGetBroker().getDestinationStatistics().isEnabled();
195        }
196    
197        public boolean isPersistent() {
198            return brokerService.isPersistent();
199        }
200    
201        public void terminateJVM(int exitCode) {
202            System.exit(exitCode);
203        }
204    
205        public ObjectName[] getTopics() {
206            return safeGetBroker().getTopics();
207        }
208    
209        public ObjectName[] getQueues() {
210            return safeGetBroker().getQueues();
211        }
212    
213        public ObjectName[] getTemporaryTopics() {
214            return safeGetBroker().getTemporaryTopics();
215        }
216    
217        public ObjectName[] getTemporaryQueues() {
218            return safeGetBroker().getTemporaryQueues();
219        }
220    
221        public ObjectName[] getTopicSubscribers() {
222            return safeGetBroker().getTopicSubscribers();
223        }
224    
225        public ObjectName[] getDurableTopicSubscribers() {
226            return safeGetBroker().getDurableTopicSubscribers();
227        }
228    
229        public ObjectName[] getQueueSubscribers() {
230            return safeGetBroker().getQueueSubscribers();
231        }
232    
233        public ObjectName[] getTemporaryTopicSubscribers() {
234            return safeGetBroker().getTemporaryTopicSubscribers();
235        }
236    
237        public ObjectName[] getTemporaryQueueSubscribers() {
238            return safeGetBroker().getTemporaryQueueSubscribers();
239        }
240    
241        public ObjectName[] getInactiveDurableTopicSubscribers() {
242            return safeGetBroker().getInactiveDurableTopicSubscribers();
243        }
244    
245        public ObjectName[] getTopicProducers() {
246            return safeGetBroker().getTopicProducers();
247        }
248    
249        public ObjectName[] getQueueProducers() {
250            return safeGetBroker().getQueueProducers();
251        }
252    
253        public ObjectName[] getTemporaryTopicProducers() {
254            return safeGetBroker().getTemporaryTopicProducers();
255        }
256    
257        public ObjectName[] getTemporaryQueueProducers() {
258            return safeGetBroker().getTemporaryQueueProducers();
259        }
260    
261        public ObjectName[] getDynamicDestinationProducers() {
262            return safeGetBroker().getDynamicDestinationProducers();
263        }
264    
265        public String addConnector(String discoveryAddress) throws Exception {
266            TransportConnector connector = brokerService.addConnector(discoveryAddress);
267            if (connector == null) {
268                throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
269            }
270            connector.start();
271            return connector.getName();
272        }
273    
274        public String addNetworkConnector(String discoveryAddress) throws Exception {
275            NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress);
276            if (connector == null) {
277                throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
278            }
279            connector.start();
280            return connector.getName();
281        }
282    
283        public boolean removeConnector(String connectorName) throws Exception {
284            TransportConnector connector = brokerService.getConnectorByName(connectorName);
285            if (connector == null) {
286                throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
287            }
288            connector.stop();
289            return brokerService.removeConnector(connector);
290        }
291    
292        public boolean removeNetworkConnector(String connectorName) throws Exception {
293            NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName);
294            if (connector == null) {
295                throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
296            }
297            connector.stop();
298            return brokerService.removeNetworkConnector(connector);
299        }
300    
301        public void addTopic(String name) throws Exception {
302            safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name),true);
303        }
304    
305        public void addQueue(String name) throws Exception {
306            safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name),true);
307        }
308    
309        public void removeTopic(String name) throws Exception {
310            safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), 1000);
311        }
312    
313        public void removeQueue(String name) throws Exception {
314            safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), 1000);
315        }
316    
317        public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName,
318                                                  String selector) throws Exception {
319            ConnectionContext context = new ConnectionContext();
320            context.setBroker(safeGetBroker());
321            context.setClientId(clientId);
322            ConsumerInfo info = new ConsumerInfo();
323            ConsumerId consumerId = new ConsumerId();
324            consumerId.setConnectionId(clientId);
325            consumerId.setSessionId(sessionIdCounter.incrementAndGet());
326            consumerId.setValue(0);
327            info.setConsumerId(consumerId);
328            info.setDestination(new ActiveMQTopic(topicName));
329            info.setSubscriptionName(subscriberName);
330            info.setSelector(selector);
331            Subscription subscription = safeGetBroker().addConsumer(context, info);
332            safeGetBroker().removeConsumer(context, info);
333            if (subscription != null) {
334                return subscription.getObjectName();
335            }
336            return null;
337        }
338    
339        public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception {
340            RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
341            info.setClientId(clientId);
342            info.setSubscriptionName(subscriberName);
343            ConnectionContext context = new ConnectionContext();
344            context.setBroker(safeGetBroker());
345            context.setClientId(clientId);
346            safeGetBroker().removeSubscription(context, info);
347        }
348    
349        //  doc comment inherited from BrokerViewMBean
350        public void reloadLog4jProperties() throws Throwable {
351    
352            // Avoid a direct dependency on log4j.. use reflection.
353            try {
354                ClassLoader cl = getClass().getClassLoader();
355                Class<?> logManagerClass = cl.loadClass("org.apache.log4j.LogManager");
356    
357                Method resetConfiguration = logManagerClass.getMethod("resetConfiguration", new Class[]{});
358                resetConfiguration.invoke(null, new Object[]{});
359    
360                String configurationOptionStr = System.getProperty("log4j.configuration");
361                URL log4jprops = null;
362                if (configurationOptionStr != null) {
363                    try {
364                        log4jprops = new URL(configurationOptionStr);
365                    } catch (MalformedURLException ex) {
366                        log4jprops = cl.getResource("log4j.properties");
367                    }
368                } else {
369                   log4jprops = cl.getResource("log4j.properties");
370                }
371    
372                if (log4jprops != null) {
373                    Class<?> propertyConfiguratorClass = cl.loadClass("org.apache.log4j.PropertyConfigurator");
374                    Method configure = propertyConfiguratorClass.getMethod("configure", new Class[]{URL.class});
375                    configure.invoke(null, new Object[]{log4jprops});
376                }
377            } catch (InvocationTargetException e) {
378                throw e.getTargetException();
379            }
380        }
381    
382        public  Map<String, String> getTransportConnectors() {
383            Map<String, String> answer = new HashMap<String, String>();
384            try {
385                for (TransportConnector connector : brokerService.getTransportConnectors()) {
386                    answer.put(connector.getName(), connector.getConnectUri().toString());
387                }
388            } catch (Exception e) {
389                LOG.debug("Failed to read URI to build transport connectors map", e);
390            }
391            return answer;
392        }
393    
394        @Override
395        public String getTransportConnectorByType(String type) {
396            return brokerService.getTransportConnectorURIsAsMap().get(type);
397        }
398    
399        @Deprecated
400        /**
401         * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
402         */
403        public String getOpenWireURL() {
404            String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
405            return answer != null ? answer : "";
406        }
407    
408        @Deprecated
409        /**
410         * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
411         */
412        public String getStompURL() {
413            String answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
414            return answer != null ? answer : "";
415        }
416    
417        @Deprecated
418        /**
419         * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
420         */
421        public String getSslURL() {
422            String answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
423            return answer != null ? answer : "";
424        }
425    
426        @Deprecated
427        /**
428         * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
429         */
430        public String getStompSslURL() {
431            String answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
432            return answer != null ? answer : "";
433        }
434    
435        public String getVMURL() {
436            URI answer = brokerService.getVmConnectorURI();
437            return answer != null ? answer.toString() : "";
438        }
439    
440        public String getDataDirectory() {
441            File file = brokerService.getDataDirectoryFile();
442            try {
443                return file != null ? file.getCanonicalPath():"";
444            } catch (IOException e) {
445                return "";
446            }
447        }
448    
449        public ObjectName getJMSJobScheduler() {
450            return this.jmsJobScheduler;
451        }
452    
453        public void setJMSJobScheduler(ObjectName name) {
454            this.jmsJobScheduler=name;
455        }
456    
457        private ManagedRegionBroker safeGetBroker() {
458            if (broker == null) {
459                throw new IllegalStateException("Broker is not yet started.");
460            }
461    
462            return broker;
463        }
464    }