001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker.jmx;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.lang.reflect.InvocationTargetException;
022 import java.lang.reflect.Method;
023 import java.net.MalformedURLException;
024 import java.net.URI;
025 import java.net.URL;
026 import java.util.HashMap;
027 import java.util.Map;
028 import java.util.NoSuchElementException;
029 import java.util.concurrent.atomic.AtomicInteger;
030 import javax.management.ObjectName;
031
032 import org.apache.activemq.ActiveMQConnectionMetaData;
033 import org.apache.activemq.broker.BrokerService;
034 import org.apache.activemq.broker.ConnectionContext;
035 import org.apache.activemq.broker.TransportConnector;
036 import org.apache.activemq.broker.region.Subscription;
037 import org.apache.activemq.command.ActiveMQQueue;
038 import org.apache.activemq.command.ActiveMQTopic;
039 import org.apache.activemq.command.ConsumerId;
040 import org.apache.activemq.command.ConsumerInfo;
041 import org.apache.activemq.command.RemoveSubscriptionInfo;
042 import org.apache.activemq.network.NetworkConnector;
043 import org.apache.activemq.util.BrokerSupport;
044 import org.slf4j.Logger;
045 import org.slf4j.LoggerFactory;
046
047 /**
048 *
049 */
050 public class BrokerView implements BrokerViewMBean {
051 private static final Logger LOG = LoggerFactory.getLogger(BrokerView.class);
052 ManagedRegionBroker broker;
053 private final BrokerService brokerService;
054 private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
055 private ObjectName jmsJobScheduler;
056
057 public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception {
058 this.brokerService = brokerService;
059 this.broker = managedBroker;
060 }
061
062 public ManagedRegionBroker getBroker() {
063 return broker;
064 }
065
066 public void setBroker(ManagedRegionBroker broker) {
067 this.broker = broker;
068 }
069
070 public String getBrokerId() {
071 return safeGetBroker().getBrokerId().toString();
072 }
073
074 public String getBrokerName() {
075 return safeGetBroker().getBrokerName();
076 }
077
078 public String getBrokerVersion() {
079 return ActiveMQConnectionMetaData.PROVIDER_VERSION;
080 }
081
082 @Override
083 public String getUptime() {
084 return brokerService.getUptime();
085 }
086
087 public void gc() throws Exception {
088 brokerService.getBroker().gc();
089 try {
090 brokerService.getPersistenceAdapter().checkpoint(true);
091 } catch (IOException e) {
092 LOG.error("Failed to checkpoint persistence adapter on gc request, reason:" + e, e);
093 }
094 }
095
096 public void start() throws Exception {
097 brokerService.start();
098 }
099
100 public void stop() throws Exception {
101 brokerService.stop();
102 }
103
104 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
105 throws Exception {
106 brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval);
107 }
108
109 public long getTotalEnqueueCount() {
110 return safeGetBroker().getDestinationStatistics().getEnqueues().getCount();
111 }
112
113 public long getTotalDequeueCount() {
114 return safeGetBroker().getDestinationStatistics().getDequeues().getCount();
115 }
116
117 public long getTotalConsumerCount() {
118 return safeGetBroker().getDestinationStatistics().getConsumers().getCount();
119 }
120
121 public long getTotalProducerCount() {
122 return safeGetBroker().getDestinationStatistics().getProducers().getCount();
123 }
124
125 public long getTotalMessageCount() {
126 return safeGetBroker().getDestinationStatistics().getMessages().getCount();
127 }
128
129 public long getTotalMessagesCached() {
130 return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount();
131 }
132
133 public int getMemoryPercentUsage() {
134 return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage();
135 }
136
137 public long getMemoryLimit() {
138 return brokerService.getSystemUsage().getMemoryUsage().getLimit();
139 }
140
141 public void setMemoryLimit(long limit) {
142 brokerService.getSystemUsage().getMemoryUsage().setLimit(limit);
143 }
144
145 public long getStoreLimit() {
146 return brokerService.getSystemUsage().getStoreUsage().getLimit();
147 }
148
149 public int getStorePercentUsage() {
150 return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
151 }
152
153 public long getTempLimit() {
154 return brokerService.getSystemUsage().getTempUsage().getLimit();
155 }
156
157 public int getTempPercentUsage() {
158 return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
159 }
160
161 public void setStoreLimit(long limit) {
162 brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
163 }
164
165 public void setTempLimit(long limit) {
166 brokerService.getSystemUsage().getTempUsage().setLimit(limit);
167 }
168
169 public void resetStatistics() {
170 safeGetBroker().getDestinationStatistics().reset();
171 }
172
173 public void enableStatistics() {
174 safeGetBroker().getDestinationStatistics().setEnabled(true);
175 }
176
177 public void disableStatistics() {
178 safeGetBroker().getDestinationStatistics().setEnabled(false);
179 }
180
181 public boolean isStatisticsEnabled() {
182 return safeGetBroker().getDestinationStatistics().isEnabled();
183 }
184
185 public boolean isPersistent() {
186 return brokerService.isPersistent();
187 }
188
189 public boolean isSlave() {
190 return brokerService.isSlave();
191 }
192
193 public void terminateJVM(int exitCode) {
194 System.exit(exitCode);
195 }
196
197 public ObjectName[] getTopics() {
198 return safeGetBroker().getTopics();
199 }
200
201 public ObjectName[] getQueues() {
202 return safeGetBroker().getQueues();
203 }
204
205 public ObjectName[] getTemporaryTopics() {
206 return safeGetBroker().getTemporaryTopics();
207 }
208
209 public ObjectName[] getTemporaryQueues() {
210 return safeGetBroker().getTemporaryQueues();
211 }
212
213 public ObjectName[] getTopicSubscribers() {
214 return safeGetBroker().getTopicSubscribers();
215 }
216
217 public ObjectName[] getDurableTopicSubscribers() {
218 return safeGetBroker().getDurableTopicSubscribers();
219 }
220
221 public ObjectName[] getQueueSubscribers() {
222 return safeGetBroker().getQueueSubscribers();
223 }
224
225 public ObjectName[] getTemporaryTopicSubscribers() {
226 return safeGetBroker().getTemporaryTopicSubscribers();
227 }
228
229 public ObjectName[] getTemporaryQueueSubscribers() {
230 return safeGetBroker().getTemporaryQueueSubscribers();
231 }
232
233 public ObjectName[] getInactiveDurableTopicSubscribers() {
234 return safeGetBroker().getInactiveDurableTopicSubscribers();
235 }
236
237 public ObjectName[] getTopicProducers() {
238 return safeGetBroker().getTopicProducers();
239 }
240
241 public ObjectName[] getQueueProducers() {
242 return safeGetBroker().getQueueProducers();
243 }
244
245 public ObjectName[] getTemporaryTopicProducers() {
246 return safeGetBroker().getTemporaryTopicProducers();
247 }
248
249 public ObjectName[] getTemporaryQueueProducers() {
250 return safeGetBroker().getTemporaryQueueProducers();
251 }
252
253 public ObjectName[] getDynamicDestinationProducers() {
254 return safeGetBroker().getDynamicDestinationProducers();
255 }
256
257 public String addConnector(String discoveryAddress) throws Exception {
258 TransportConnector connector = brokerService.addConnector(discoveryAddress);
259 if (connector == null) {
260 throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
261 }
262 connector.start();
263 return connector.getName();
264 }
265
266 public String addNetworkConnector(String discoveryAddress) throws Exception {
267 NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress);
268 if (connector == null) {
269 throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
270 }
271 connector.start();
272 return connector.getName();
273 }
274
275 public boolean removeConnector(String connectorName) throws Exception {
276 TransportConnector connector = brokerService.getConnectorByName(connectorName);
277 if (connector == null) {
278 throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
279 }
280 connector.stop();
281 return brokerService.removeConnector(connector);
282 }
283
284 public boolean removeNetworkConnector(String connectorName) throws Exception {
285 NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName);
286 if (connector == null) {
287 throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
288 }
289 connector.stop();
290 return brokerService.removeNetworkConnector(connector);
291 }
292
293 public void addTopic(String name) throws Exception {
294 safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name),true);
295 }
296
297 public void addQueue(String name) throws Exception {
298 safeGetBroker().getContextBroker().addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name),true);
299 }
300
301 public void removeTopic(String name) throws Exception {
302 safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), 1000);
303 }
304
305 public void removeQueue(String name) throws Exception {
306 safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), 1000);
307 }
308
309 public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName,
310 String selector) throws Exception {
311 ConnectionContext context = new ConnectionContext();
312 context.setBroker(safeGetBroker());
313 context.setClientId(clientId);
314 ConsumerInfo info = new ConsumerInfo();
315 ConsumerId consumerId = new ConsumerId();
316 consumerId.setConnectionId(clientId);
317 consumerId.setSessionId(sessionIdCounter.incrementAndGet());
318 consumerId.setValue(0);
319 info.setConsumerId(consumerId);
320 info.setDestination(new ActiveMQTopic(topicName));
321 info.setSubscriptionName(subscriberName);
322 info.setSelector(selector);
323 Subscription subscription = safeGetBroker().addConsumer(context, info);
324 safeGetBroker().removeConsumer(context, info);
325 if (subscription != null) {
326 return subscription.getObjectName();
327 }
328 return null;
329 }
330
331 public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception {
332 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
333 info.setClientId(clientId);
334 info.setSubscriptionName(subscriberName);
335 ConnectionContext context = new ConnectionContext();
336 context.setBroker(safeGetBroker());
337 context.setClientId(clientId);
338 safeGetBroker().removeSubscription(context, info);
339 }
340
341 // doc comment inherited from BrokerViewMBean
342 public void reloadLog4jProperties() throws Throwable {
343
344 // Avoid a direct dependency on log4j.. use reflection.
345 try {
346 ClassLoader cl = getClass().getClassLoader();
347 Class<?> logManagerClass = cl.loadClass("org.apache.log4j.LogManager");
348
349 Method resetConfiguration = logManagerClass.getMethod("resetConfiguration", new Class[]{});
350 resetConfiguration.invoke(null, new Object[]{});
351
352 String configurationOptionStr = System.getProperty("log4j.configuration");
353 URL log4jprops = null;
354 if (configurationOptionStr != null) {
355 try {
356 log4jprops = new URL(configurationOptionStr);
357 } catch (MalformedURLException ex) {
358 log4jprops = cl.getResource("log4j.properties");
359 }
360 } else {
361 log4jprops = cl.getResource("log4j.properties");
362 }
363
364 if (log4jprops != null) {
365 Class<?> propertyConfiguratorClass = cl.loadClass("org.apache.log4j.PropertyConfigurator");
366 Method configure = propertyConfiguratorClass.getMethod("configure", new Class[]{URL.class});
367 configure.invoke(null, new Object[]{log4jprops});
368 }
369 } catch (InvocationTargetException e) {
370 throw e.getTargetException();
371 }
372 }
373
374 public Map<String, String> getTransportConnectors() {
375 Map<String, String> answer = new HashMap<String, String>();
376 try {
377 for (TransportConnector connector : brokerService.getTransportConnectors()) {
378 answer.put(connector.getName(), connector.getConnectUri().toString());
379 }
380 } catch (Exception e) {
381 LOG.debug("Failed to read URI to build transport connectors map", e);
382 }
383 return answer;
384 }
385
386 @Override
387 public String getTransportConnectorByType(String type) {
388 return brokerService.getTransportConnectorURIsAsMap().get(type);
389 }
390
391 @Deprecated
392 /**
393 * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
394 */
395 public String getOpenWireURL() {
396 String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
397 return answer != null ? answer : "";
398 }
399
400 @Deprecated
401 /**
402 * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
403 */
404 public String getStompURL() {
405 String answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
406 return answer != null ? answer : "";
407 }
408
409 @Deprecated
410 /**
411 * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
412 */
413 public String getSslURL() {
414 String answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
415 return answer != null ? answer : "";
416 }
417
418 @Deprecated
419 /**
420 * @deprecated use {@link #getTransportConnectors()} or {@link #getTransportConnectorByType(String)}
421 */
422 public String getStompSslURL() {
423 String answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
424 return answer != null ? answer : "";
425 }
426
427 public String getVMURL() {
428 URI answer = brokerService.getVmConnectorURI();
429 return answer != null ? answer.toString() : "";
430 }
431
432 public String getDataDirectory() {
433 File file = brokerService.getDataDirectoryFile();
434 try {
435 return file != null ? file.getCanonicalPath():"";
436 } catch (IOException e) {
437 return "";
438 }
439 }
440
441 public ObjectName getJMSJobScheduler() {
442 return this.jmsJobScheduler;
443 }
444
445 public void setJMSJobScheduler(ObjectName name) {
446 this.jmsJobScheduler=name;
447 }
448
449 private ManagedRegionBroker safeGetBroker() {
450 if (broker == null) {
451 throw new IllegalStateException("Broker is not yet started.");
452 }
453
454 return broker;
455 }
456 }