001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.File;
020import java.io.IOException;
021import java.net.URI;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.NoSuchElementException;
025import java.util.concurrent.atomic.AtomicInteger;
026
027import javax.management.ObjectName;
028
029import org.apache.activemq.ActiveMQConnectionMetaData;
030import org.apache.activemq.broker.BrokerService;
031import org.apache.activemq.broker.ConnectionContext;
032import org.apache.activemq.broker.TransportConnector;
033import org.apache.activemq.broker.region.Subscription;
034import org.apache.activemq.command.ActiveMQQueue;
035import org.apache.activemq.command.ActiveMQTopic;
036import org.apache.activemq.command.ConsumerId;
037import org.apache.activemq.command.ConsumerInfo;
038import org.apache.activemq.command.RemoveSubscriptionInfo;
039import org.apache.activemq.network.NetworkConnector;
040import org.apache.activemq.util.BrokerSupport;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 *
046 */
047public class BrokerView implements BrokerViewMBean {
048    private static final Logger LOG = LoggerFactory.getLogger(BrokerView.class);
049    ManagedRegionBroker broker;
050    private final BrokerService brokerService;
051    private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
052    private ObjectName jmsJobScheduler;
053
054    public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception {
055        this.brokerService = brokerService;
056        this.broker = managedBroker;
057    }
058
059    public ManagedRegionBroker getBroker() {
060        return broker;
061    }
062
063    public void setBroker(ManagedRegionBroker broker) {
064        this.broker = broker;
065    }
066
067    @Override
068    public String getBrokerId() {
069        return safeGetBroker().getBrokerId().toString();
070    }
071
072    @Override
073    public String getBrokerName() {
074        return safeGetBroker().getBrokerName();
075    }
076
077    @Override
078    public String getBrokerVersion() {
079        return ActiveMQConnectionMetaData.PROVIDER_VERSION;
080    }
081
082    @Override
083    public String getUptime() {
084        return brokerService.getUptime();
085    }
086
087    @Override
088    public long getUptimeMillis() {
089        return brokerService.getUptimeMillis();
090    }
091
092    @Override
093    public int getCurrentConnectionsCount() {
094        return brokerService.getCurrentConnections();
095    }
096
097    @Override
098    public long getTotalConnectionsCount() {
099        return brokerService.getTotalConnections();
100    }
101
102    @Override
103    public void gc() throws Exception {
104        brokerService.getBroker().gc();
105        try {
106            brokerService.getPersistenceAdapter().checkpoint(true);
107        } catch (IOException e) {
108            LOG.error("Failed to checkpoint persistence adapter on gc request", e);
109        }
110    }
111
112    @Override
113    public void start() throws Exception {
114        brokerService.start();
115    }
116
117    @Override
118    public void stop() throws Exception {
119        brokerService.stop();
120    }
121
122    @Override
123    public void restart() throws Exception {
124        if (brokerService.isRestartAllowed()) {
125            brokerService.requestRestart();
126            brokerService.stop();
127        } else {
128            throw new Exception("Restart is not allowed");
129        }
130    }
131
132    @Override
133    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
134            throws Exception {
135        brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval);
136    }
137
138    @Override
139    public long getTotalEnqueueCount() {
140        return safeGetBroker().getDestinationStatistics().getEnqueues().getCount();
141    }
142
143    @Override
144    public long getTotalDequeueCount() {
145        return safeGetBroker().getDestinationStatistics().getDequeues().getCount();
146    }
147
148    @Override
149    public long getTotalConsumerCount() {
150        return safeGetBroker().getDestinationStatistics().getConsumers().getCount();
151    }
152
153    @Override
154    public long getTotalProducerCount() {
155        return safeGetBroker().getDestinationStatistics().getProducers().getCount();
156    }
157
158    @Override
159    public long getTotalMessageCount() {
160        return safeGetBroker().getDestinationStatistics().getMessages().getCount();
161    }
162
163    /**
164     * @return the average size of a message (bytes)
165     */
166    @Override
167    public long getAverageMessageSize() {
168        // we are okay with the size without decimals so cast to long
169        return (long) safeGetBroker().getDestinationStatistics().getMessageSize().getAverageSize();
170    }
171
172    /**
173     * @return the max size of a message (bytes)
174     */
175    @Override
176    public long getMaxMessageSize() {
177        return safeGetBroker().getDestinationStatistics().getMessageSize().getMaxSize();
178    }
179
180    /**
181     * @return the min size of a message (bytes)
182     */
183    @Override
184    public long getMinMessageSize() {
185        return safeGetBroker().getDestinationStatistics().getMessageSize().getMinSize();
186    }
187
188    public long getTotalMessagesCached() {
189        return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount();
190    }
191
192    @Override
193    public int getMemoryPercentUsage() {
194        return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage();
195    }
196
197    @Override
198    public long getMemoryLimit() {
199        return brokerService.getSystemUsage().getMemoryUsage().getLimit();
200    }
201
202    @Override
203    public void setMemoryLimit(long limit) {
204        brokerService.getSystemUsage().getMemoryUsage().setLimit(limit);
205    }
206
207    @Override
208    public long getStoreLimit() {
209        return brokerService.getSystemUsage().getStoreUsage().getLimit();
210    }
211
212    @Override
213    public int getStorePercentUsage() {
214        return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
215    }
216
217    @Override
218    public long getTempLimit() {
219       return brokerService.getSystemUsage().getTempUsage().getLimit();
220    }
221
222    @Override
223    public int getTempPercentUsage() {
224       return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
225    }
226
227    @Override
228    public long getJobSchedulerStoreLimit() {
229        return brokerService.getSystemUsage().getJobSchedulerUsage().getLimit();
230    }
231
232    @Override
233    public int getJobSchedulerStorePercentUsage() {
234        return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage();
235    }
236
237    @Override
238    public void setStoreLimit(long limit) {
239        brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
240    }
241
242    @Override
243    public void setTempLimit(long limit) {
244        brokerService.getSystemUsage().getTempUsage().setLimit(limit);
245    }
246
247    @Override
248    public void setJobSchedulerStoreLimit(long limit) {
249        brokerService.getSystemUsage().getJobSchedulerUsage().setLimit(limit);
250    }
251
252    @Override
253    public void resetStatistics() {
254        safeGetBroker().getDestinationStatistics().reset();
255    }
256
257    @Override
258    public void enableStatistics() {
259        safeGetBroker().getDestinationStatistics().setEnabled(true);
260    }
261
262    @Override
263    public void disableStatistics() {
264        safeGetBroker().getDestinationStatistics().setEnabled(false);
265    }
266
267    @Override
268    public boolean isStatisticsEnabled() {
269        return safeGetBroker().getDestinationStatistics().isEnabled();
270    }
271
272    @Override
273    public boolean isPersistent() {
274        return brokerService.isPersistent();
275    }
276
277    @Override
278    public void terminateJVM(int exitCode) {
279        System.exit(exitCode);
280    }
281
282    @Override
283    public ObjectName[] getTopics() {
284        return safeGetBroker().getTopicsNonSuppressed();
285    }
286
287    @Override
288    public ObjectName[] getQueues() {
289        return safeGetBroker().getQueuesNonSuppressed();
290    }
291
292    @Override
293    public ObjectName[] getTemporaryTopics() {
294        return safeGetBroker().getTemporaryTopicsNonSuppressed();
295    }
296
297    @Override
298    public ObjectName[] getTemporaryQueues() {
299        return safeGetBroker().getTemporaryQueuesNonSuppressed();
300    }
301
302    @Override
303    public ObjectName[] getTopicSubscribers() {
304        return safeGetBroker().getTopicSubscribersNonSuppressed();
305    }
306
307    @Override
308    public ObjectName[] getDurableTopicSubscribers() {
309        return safeGetBroker().getDurableTopicSubscribersNonSuppressed();
310    }
311
312    @Override
313    public ObjectName[] getQueueSubscribers() {
314        return safeGetBroker().getQueueSubscribersNonSuppressed();
315    }
316
317    @Override
318    public ObjectName[] getTemporaryTopicSubscribers() {
319        return safeGetBroker().getTemporaryTopicSubscribersNonSuppressed();
320    }
321
322    @Override
323    public ObjectName[] getTemporaryQueueSubscribers() {
324        return safeGetBroker().getTemporaryQueueSubscribersNonSuppressed();
325    }
326
327    @Override
328    public ObjectName[] getInactiveDurableTopicSubscribers() {
329        return safeGetBroker().getInactiveDurableTopicSubscribersNonSuppressed();
330    }
331
332    @Override
333    public ObjectName[] getTopicProducers() {
334        return safeGetBroker().getTopicProducersNonSuppressed();
335    }
336
337    @Override
338    public ObjectName[] getQueueProducers() {
339        return safeGetBroker().getQueueProducersNonSuppressed();
340    }
341
342    @Override
343    public ObjectName[] getTemporaryTopicProducers() {
344        return safeGetBroker().getTemporaryTopicProducersNonSuppressed();
345    }
346
347    @Override
348    public ObjectName[] getTemporaryQueueProducers() {
349        return safeGetBroker().getTemporaryQueueProducersNonSuppressed();
350    }
351
352    @Override
353    public ObjectName[] getDynamicDestinationProducers() {
354        return safeGetBroker().getDynamicDestinationProducersNonSuppressed();
355    }
356
357    @Override
358    public String addConnector(String discoveryAddress) throws Exception {
359        TransportConnector connector = brokerService.addConnector(discoveryAddress);
360        if (connector == null) {
361            throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
362        }
363        brokerService.startTransportConnector(connector);
364        return connector.getName();
365    }
366
367    @Override
368    public String addNetworkConnector(String discoveryAddress) throws Exception {
369        NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress);
370        if (connector == null) {
371            throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
372        }
373        brokerService.registerNetworkConnectorMBean(connector);
374        connector.start();
375        return connector.getName();
376    }
377
378    @Override
379    public boolean removeConnector(String connectorName) throws Exception {
380        TransportConnector connector = brokerService.getConnectorByName(connectorName);
381        if (connector == null) {
382            throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
383        }
384        connector.stop();
385        return brokerService.removeConnector(connector);
386    }
387
388    @Override
389    public boolean removeNetworkConnector(String connectorName) throws Exception {
390        NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName);
391        if (connector == null) {
392            throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
393        }
394        connector.stop();
395        return brokerService.removeNetworkConnector(connector);
396    }
397
398    @Override
399    public void addTopic(String name) throws Exception {
400        safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name),true);
401    }
402
403    @Override
404    public void addQueue(String name) throws Exception {
405        safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name),true);
406    }
407
408    @Override
409    public void removeTopic(String name) throws Exception {
410        safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), 1000);
411    }
412
413    @Override
414    public void removeQueue(String name) throws Exception {
415        safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), 1000);
416    }
417
418    @Override
419    public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName,
420                                              String selector) throws Exception {
421        ConnectionContext context = getConnectionContext();
422        context.setBroker(safeGetBroker());
423        context.setClientId(clientId);
424        ConsumerInfo info = new ConsumerInfo();
425        ConsumerId consumerId = new ConsumerId();
426        consumerId.setConnectionId(clientId);
427        consumerId.setSessionId(sessionIdCounter.incrementAndGet());
428        consumerId.setValue(0);
429        info.setConsumerId(consumerId);
430        info.setDestination(new ActiveMQTopic(topicName));
431        info.setSubscriptionName(subscriberName);
432        info.setSelector(selector);
433        Subscription subscription = safeGetBroker().addConsumer(context, info);
434        safeGetBroker().removeConsumer(context, info);
435        if (subscription != null) {
436            return subscription.getObjectName();
437        }
438        return null;
439    }
440
441    @Override
442    public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception {
443        RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
444        info.setClientId(clientId);
445        info.setSubscriptionName(subscriberName);
446        ConnectionContext context = getConnectionContext();
447        context.setBroker(safeGetBroker());
448        context.setClientId(clientId);
449        brokerService.getBroker().removeSubscription(context, info);
450    }
451
452    @Override
453    public void reloadLog4jProperties() throws Throwable {
454        Log4JConfigView.doReloadLog4jProperties();
455    }
456
457    @Override
458    public  Map<String, String> getTransportConnectors() {
459        Map<String, String> answer = new HashMap<String, String>();
460        try {
461            for (TransportConnector connector : brokerService.getTransportConnectors()) {
462                answer.put(connector.getName(), connector.getConnectUri().toString());
463            }
464        } catch (Exception e) {
465            LOG.debug("Failed to read URI to build transport connectors map", e);
466        }
467        return answer;
468    }
469
470    @Override
471    public String getTransportConnectorByType(String type) {
472        return brokerService.getTransportConnectorURIsAsMap().get(type);
473    }
474
475    @Override
476    @Deprecated
477    /**
478     * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
479     */
480    public String getOpenWireURL() {
481        String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
482        return answer != null ? answer : "";
483    }
484
485    @Override
486    @Deprecated
487    /**
488     * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
489     */
490    public String getStompURL() {
491        String answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
492        return answer != null ? answer : "";
493    }
494
495    @Override
496    @Deprecated
497    /**
498     * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
499     */
500    public String getSslURL() {
501        String answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
502        return answer != null ? answer : "";
503    }
504
505    @Override
506    @Deprecated
507    /**
508     * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
509     */
510    public String getStompSslURL() {
511        String answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
512        return answer != null ? answer : "";
513    }
514
515    @Override
516    public String getVMURL() {
517        URI answer = brokerService.getVmConnectorURI();
518        return answer != null ? answer.toString() : "";
519    }
520
521    @Override
522    public String getDataDirectory() {
523        File file = brokerService.getDataDirectoryFile();
524        try {
525            return file != null ? file.getCanonicalPath():"";
526        } catch (IOException e) {
527            return "";
528        }
529    }
530
531    @Override
532    public ObjectName getJMSJobScheduler() {
533        return this.jmsJobScheduler;
534    }
535
536    public void setJMSJobScheduler(ObjectName name) {
537        this.jmsJobScheduler=name;
538    }
539
540    @Override
541    public boolean isSlave() {
542        return brokerService.isSlave();
543    }
544
545    private ManagedRegionBroker safeGetBroker() {
546        if (broker == null) {
547            throw new IllegalStateException("Broker is not yet started.");
548        }
549
550        return broker;
551    }
552
553    private ConnectionContext getConnectionContext() {
554        ConnectionContext context;
555        if(broker == null) {
556            context = new ConnectionContext();
557
558        }
559        else {
560            ConnectionContext sharedContext = BrokerSupport.getConnectionContext(broker.getContextBroker());
561            //Make a local copy of the sharedContext. We do this because we do not want to set a clientId on the
562            //global sharedContext. Taking a copy of the sharedContext is a good way to make sure that we are not
563            //messing up the shared context
564            context = sharedContext.copy();
565        }
566
567        return context;
568    }
569}