001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.File;
020import java.io.IOException;
021import java.net.URI;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.NoSuchElementException;
025import java.util.concurrent.atomic.AtomicInteger;
026
027import javax.management.ObjectName;
028
029import org.apache.activemq.ActiveMQConnectionMetaData;
030import org.apache.activemq.broker.BrokerService;
031import org.apache.activemq.broker.ConnectionContext;
032import org.apache.activemq.broker.TransportConnector;
033import org.apache.activemq.broker.region.Subscription;
034import org.apache.activemq.command.ActiveMQQueue;
035import org.apache.activemq.command.ActiveMQTopic;
036import org.apache.activemq.command.ConsumerId;
037import org.apache.activemq.command.ConsumerInfo;
038import org.apache.activemq.command.RemoveSubscriptionInfo;
039import org.apache.activemq.network.NetworkConnector;
040import org.apache.activemq.util.BrokerSupport;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044public class BrokerView implements BrokerViewMBean {
045
046    private static final Logger LOG = LoggerFactory.getLogger(BrokerView.class);
047
048    ManagedRegionBroker broker;
049
050    private final BrokerService brokerService;
051    private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
052    private ObjectName jmsJobScheduler;
053
054    public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception {
055        this.brokerService = brokerService;
056        this.broker = managedBroker;
057    }
058
059    public ManagedRegionBroker getBroker() {
060        return broker;
061    }
062
063    public void setBroker(ManagedRegionBroker broker) {
064        this.broker = broker;
065    }
066
067    @Override
068    public String getBrokerId() {
069        return safeGetBroker().getBrokerId().toString();
070    }
071
072    @Override
073    public String getBrokerName() {
074        return safeGetBroker().getBrokerName();
075    }
076
077    @Override
078    public String getBrokerVersion() {
079        return ActiveMQConnectionMetaData.PROVIDER_VERSION;
080    }
081
082    @Override
083    public String getUptime() {
084        return brokerService.getUptime();
085    }
086
087    @Override
088    public long getUptimeMillis() {
089        return brokerService.getUptimeMillis();
090    }
091
092    @Override
093    public int getCurrentConnectionsCount() {
094        return brokerService.getCurrentConnections();
095    }
096
097    @Override
098    public long getTotalConnectionsCount() {
099        return brokerService.getTotalConnections();
100    }
101
102    @Override
103    public void gc() throws Exception {
104        brokerService.getBroker().gc();
105        try {
106            brokerService.getPersistenceAdapter().checkpoint(true);
107        } catch (IOException e) {
108            LOG.error("Failed to checkpoint persistence adapter on gc request", e);
109        }
110    }
111
112    @Override
113    public void start() throws Exception {
114        brokerService.start();
115    }
116
117    @Override
118    public void stop() throws Exception {
119        brokerService.stop();
120    }
121
122    @Override
123    public void restart() throws Exception {
124        if (brokerService.isRestartAllowed()) {
125            brokerService.requestRestart();
126            brokerService.stop();
127        } else {
128            throw new Exception("Restart is not allowed");
129        }
130    }
131
132    @Override
133    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
134        brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval);
135    }
136
137    @Override
138    public long getTotalEnqueueCount() {
139        return safeGetBroker().getDestinationStatistics().getEnqueues().getCount();
140    }
141
142    @Override
143    public long getTotalDequeueCount() {
144        return safeGetBroker().getDestinationStatistics().getDequeues().getCount();
145    }
146
147    @Override
148    public long getTotalConsumerCount() {
149        return safeGetBroker().getDestinationStatistics().getConsumers().getCount();
150    }
151
152    @Override
153    public long getTotalProducerCount() {
154        return safeGetBroker().getDestinationStatistics().getProducers().getCount();
155    }
156
157    @Override
158    public long getTotalMessageCount() {
159        return safeGetBroker().getDestinationStatistics().getMessages().getCount();
160    }
161
162    /**
163     * @return the average size of a message (bytes)
164     */
165    @Override
166    public long getAverageMessageSize() {
167        // we are okay with the size without decimals so cast to long
168        return (long) safeGetBroker().getDestinationStatistics().getMessageSize().getAverageSize();
169    }
170
171    /**
172     * @return the max size of a message (bytes)
173     */
174    @Override
175    public long getMaxMessageSize() {
176        return safeGetBroker().getDestinationStatistics().getMessageSize().getMaxSize();
177    }
178
179    /**
180     * @return the min size of a message (bytes)
181     */
182    @Override
183    public long getMinMessageSize() {
184        return safeGetBroker().getDestinationStatistics().getMessageSize().getMinSize();
185    }
186
187    public long getTotalMessagesCached() {
188        return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount();
189    }
190
191    @Override
192    public int getMemoryPercentUsage() {
193        return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage();
194    }
195
196    @Override
197    public long getMemoryLimit() {
198        return brokerService.getSystemUsage().getMemoryUsage().getLimit();
199    }
200
201    @Override
202    public void setMemoryLimit(long limit) {
203        brokerService.getSystemUsage().getMemoryUsage().setLimit(limit);
204    }
205
206    @Override
207    public long getStoreLimit() {
208        return brokerService.getSystemUsage().getStoreUsage().getLimit();
209    }
210
211    @Override
212    public int getStorePercentUsage() {
213        return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
214    }
215
216    @Override
217    public long getTempLimit() {
218        return brokerService.getSystemUsage().getTempUsage().getLimit();
219    }
220
221    @Override
222    public int getTempPercentUsage() {
223        return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
224    }
225
226    @Override
227    public long getJobSchedulerStoreLimit() {
228        return brokerService.getSystemUsage().getJobSchedulerUsage().getLimit();
229    }
230
231    @Override
232    public int getJobSchedulerStorePercentUsage() {
233        return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage();
234    }
235
236    @Override
237    public void setStoreLimit(long limit) {
238        brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
239    }
240
241    @Override
242    public void setTempLimit(long limit) {
243        brokerService.getSystemUsage().getTempUsage().setLimit(limit);
244    }
245
246    @Override
247    public void setJobSchedulerStoreLimit(long limit) {
248        brokerService.getSystemUsage().getJobSchedulerUsage().setLimit(limit);
249    }
250
251    @Override
252    public void resetStatistics() {
253        safeGetBroker().getDestinationStatistics().reset();
254    }
255
256    @Override
257    public void enableStatistics() {
258        safeGetBroker().getDestinationStatistics().setEnabled(true);
259    }
260
261    @Override
262    public void disableStatistics() {
263        safeGetBroker().getDestinationStatistics().setEnabled(false);
264    }
265
266    @Override
267    public boolean isStatisticsEnabled() {
268        return safeGetBroker().getDestinationStatistics().isEnabled();
269    }
270
271    @Override
272    public boolean isPersistent() {
273        return brokerService.isPersistent();
274    }
275
276    @Override
277    public void terminateJVM(int exitCode) {
278        System.exit(exitCode);
279    }
280
281    @Override
282    public ObjectName[] getTopics() {
283        return safeGetBroker().getTopicsNonSuppressed();
284    }
285
286    @Override
287    public ObjectName[] getQueues() {
288        return safeGetBroker().getQueuesNonSuppressed();
289    }
290
291    @Override
292    public ObjectName[] getTemporaryTopics() {
293        return safeGetBroker().getTemporaryTopicsNonSuppressed();
294    }
295
296    @Override
297    public ObjectName[] getTemporaryQueues() {
298        return safeGetBroker().getTemporaryQueuesNonSuppressed();
299    }
300
301    @Override
302    public ObjectName[] getTopicSubscribers() {
303        return safeGetBroker().getTopicSubscribersNonSuppressed();
304    }
305
306    @Override
307    public ObjectName[] getDurableTopicSubscribers() {
308        return safeGetBroker().getDurableTopicSubscribersNonSuppressed();
309    }
310
311    @Override
312    public ObjectName[] getQueueSubscribers() {
313        return safeGetBroker().getQueueSubscribersNonSuppressed();
314    }
315
316    @Override
317    public ObjectName[] getTemporaryTopicSubscribers() {
318        return safeGetBroker().getTemporaryTopicSubscribersNonSuppressed();
319    }
320
321    @Override
322    public ObjectName[] getTemporaryQueueSubscribers() {
323        return safeGetBroker().getTemporaryQueueSubscribersNonSuppressed();
324    }
325
326    @Override
327    public ObjectName[] getInactiveDurableTopicSubscribers() {
328        return safeGetBroker().getInactiveDurableTopicSubscribersNonSuppressed();
329    }
330
331    @Override
332    public ObjectName[] getTopicProducers() {
333        return safeGetBroker().getTopicProducersNonSuppressed();
334    }
335
336    @Override
337    public ObjectName[] getQueueProducers() {
338        return safeGetBroker().getQueueProducersNonSuppressed();
339    }
340
341    @Override
342    public ObjectName[] getTemporaryTopicProducers() {
343        return safeGetBroker().getTemporaryTopicProducersNonSuppressed();
344    }
345
346    @Override
347    public ObjectName[] getTemporaryQueueProducers() {
348        return safeGetBroker().getTemporaryQueueProducersNonSuppressed();
349    }
350
351    @Override
352    public ObjectName[] getDynamicDestinationProducers() {
353        return safeGetBroker().getDynamicDestinationProducersNonSuppressed();
354    }
355
356    @Override
357    public String addConnector(String discoveryAddress) throws Exception {
358        TransportConnector connector = brokerService.addConnector(discoveryAddress);
359        if (connector == null) {
360            throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
361        }
362        brokerService.startTransportConnector(connector);
363        return connector.getName();
364    }
365
366    @Override
367    public String addNetworkConnector(String discoveryAddress) throws Exception {
368        NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress);
369        if (connector == null) {
370            throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
371        }
372        brokerService.registerNetworkConnectorMBean(connector);
373        connector.start();
374        return connector.getName();
375    }
376
377    @Override
378    public boolean removeConnector(String connectorName) throws Exception {
379        TransportConnector connector = brokerService.getConnectorByName(connectorName);
380        if (connector == null) {
381            throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
382        }
383        connector.stop();
384        return brokerService.removeConnector(connector);
385    }
386
387    @Override
388    public boolean removeNetworkConnector(String connectorName) throws Exception {
389        NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName);
390        if (connector == null) {
391            throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
392        }
393        connector.stop();
394        return brokerService.removeNetworkConnector(connector);
395    }
396
397    @Override
398    public void addTopic(String name) throws Exception {
399        safeGetBroker().getContextBroker()
400            .addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), true);
401    }
402
403    @Override
404    public void addQueue(String name) throws Exception {
405        safeGetBroker().getContextBroker()
406            .addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), true);
407    }
408
409    @Override
410    public void removeTopic(String name) throws Exception {
411        safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), 1000);
412    }
413
414    @Override
415    public void removeQueue(String name) throws Exception {
416        safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), 1000);
417    }
418
419    @Override
420    public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception {
421        ConnectionContext context = getConnectionContext();
422        context.setBroker(safeGetBroker());
423        context.setClientId(clientId);
424        ConsumerInfo info = new ConsumerInfo();
425        ConsumerId consumerId = new ConsumerId();
426        consumerId.setConnectionId(clientId);
427        consumerId.setSessionId(sessionIdCounter.incrementAndGet());
428        consumerId.setValue(0);
429        info.setConsumerId(consumerId);
430        info.setDestination(new ActiveMQTopic(topicName));
431        info.setSubscriptionName(subscriberName);
432        info.setSelector(selector);
433        Subscription subscription = safeGetBroker().addConsumer(context, info);
434        safeGetBroker().removeConsumer(context, info);
435        if (subscription != null) {
436            return subscription.getObjectName();
437        }
438        return null;
439    }
440
441    @Override
442    public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception {
443        RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
444        info.setClientId(clientId);
445        info.setSubscriptionName(subscriberName);
446        ConnectionContext context = getConnectionContext();
447        context.setBroker(safeGetBroker());
448        context.setClientId(clientId);
449        brokerService.getBroker().removeSubscription(context, info);
450    }
451
452    @Override
453    public void reloadLog4jProperties() throws Throwable {
454        Log4JConfigView.doReloadLog4jProperties();
455    }
456
457    @Override
458    public Map<String, String> getTransportConnectors() {
459        Map<String, String> answer = new HashMap<String, String>();
460        try {
461            for (TransportConnector connector : brokerService.getTransportConnectors()) {
462                answer.put(connector.getName(), connector.getConnectUri().toString());
463            }
464        } catch (Exception e) {
465            LOG.debug("Failed to read URI to build transport connectors map", e);
466        }
467        return answer;
468    }
469
470    @Override
471    public String getTransportConnectorByType(String type) {
472        return brokerService.getTransportConnectorURIsAsMap().get(type);
473    }
474
475    @Override
476    public String getVMURL() {
477        URI answer = brokerService.getVmConnectorURI();
478        return answer != null ? answer.toString() : "";
479    }
480
481    @Override
482    public String getDataDirectory() {
483        File file = brokerService.getDataDirectoryFile();
484        try {
485            return file != null ? file.getCanonicalPath() : "";
486        } catch (IOException e) {
487            return "";
488        }
489    }
490
491    @Override
492    public ObjectName getJMSJobScheduler() {
493        return this.jmsJobScheduler;
494    }
495
496    public void setJMSJobScheduler(ObjectName name) {
497        this.jmsJobScheduler = name;
498    }
499
500    @Override
501    public boolean isSlave() {
502        return brokerService.isSlave();
503    }
504
505    private ManagedRegionBroker safeGetBroker() {
506        if (broker == null) {
507            throw new IllegalStateException("Broker is not yet started.");
508        }
509
510        return broker;
511    }
512
513    private ConnectionContext getConnectionContext() {
514        ConnectionContext context;
515        if (broker == null) {
516            context = new ConnectionContext();
517        } else {
518            ConnectionContext sharedContext = BrokerSupport.getConnectionContext(broker.getContextBroker());
519            // Make a local copy of the sharedContext. We do this because we do
520            // not want to set a clientId on the
521            // global sharedContext. Taking a copy of the sharedContext is a
522            // good way to make sure that we are not
523            // messing up the shared context
524            context = sharedContext.copy();
525        }
526
527        return context;
528    }
529}