001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.broker;
018
019 import java.io.BufferedReader;
020 import java.io.File;
021 import java.io.IOException;
022 import java.io.InputStream;
023 import java.io.InputStreamReader;
024 import java.net.URI;
025 import java.net.URISyntaxException;
026 import java.net.UnknownHostException;
027 import java.util.ArrayList;
028 import java.util.Date;
029 import java.util.HashMap;
030 import java.util.HashSet;
031 import java.util.Iterator;
032 import java.util.List;
033 import java.util.Locale;
034 import java.util.Map;
035 import java.util.Set;
036 import java.util.concurrent.CopyOnWriteArrayList;
037 import java.util.concurrent.CountDownLatch;
038 import java.util.concurrent.LinkedBlockingQueue;
039 import java.util.concurrent.RejectedExecutionException;
040 import java.util.concurrent.RejectedExecutionHandler;
041 import java.util.concurrent.SynchronousQueue;
042 import java.util.concurrent.ThreadFactory;
043 import java.util.concurrent.ThreadPoolExecutor;
044 import java.util.concurrent.TimeUnit;
045 import java.util.concurrent.atomic.AtomicBoolean;
046
047 import javax.annotation.PostConstruct;
048 import javax.annotation.PreDestroy;
049 import javax.management.MalformedObjectNameException;
050 import javax.management.ObjectName;
051
052 import org.apache.activemq.ActiveMQConnectionMetaData;
053 import org.apache.activemq.ConfigurationException;
054 import org.apache.activemq.Service;
055 import org.apache.activemq.advisory.AdvisoryBroker;
056 import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
057 import org.apache.activemq.broker.ft.MasterConnector;
058 import org.apache.activemq.broker.jmx.AnnotatedMBean;
059 import org.apache.activemq.broker.jmx.BrokerView;
060 import org.apache.activemq.broker.jmx.ConnectorView;
061 import org.apache.activemq.broker.jmx.ConnectorViewMBean;
062 import org.apache.activemq.broker.jmx.FTConnectorView;
063 import org.apache.activemq.broker.jmx.JmsConnectorView;
064 import org.apache.activemq.broker.jmx.JobSchedulerView;
065 import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
066 import org.apache.activemq.broker.jmx.ManagedRegionBroker;
067 import org.apache.activemq.broker.jmx.ManagementContext;
068 import org.apache.activemq.broker.jmx.NetworkConnectorView;
069 import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
070 import org.apache.activemq.broker.jmx.ProxyConnectorView;
071 import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
072 import org.apache.activemq.broker.region.Destination;
073 import org.apache.activemq.broker.region.DestinationFactory;
074 import org.apache.activemq.broker.region.DestinationFactoryImpl;
075 import org.apache.activemq.broker.region.DestinationInterceptor;
076 import org.apache.activemq.broker.region.RegionBroker;
077 import org.apache.activemq.broker.region.policy.PolicyMap;
078 import org.apache.activemq.broker.region.virtual.MirroredQueue;
079 import org.apache.activemq.broker.region.virtual.VirtualDestination;
080 import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
081 import org.apache.activemq.broker.region.virtual.VirtualTopic;
082 import org.apache.activemq.broker.scheduler.SchedulerBroker;
083 import org.apache.activemq.command.ActiveMQDestination;
084 import org.apache.activemq.command.ActiveMQQueue;
085 import org.apache.activemq.command.BrokerId;
086 import org.apache.activemq.filter.DestinationFilter;
087 import org.apache.activemq.network.ConnectionFilter;
088 import org.apache.activemq.network.DiscoveryNetworkConnector;
089 import org.apache.activemq.network.NetworkConnector;
090 import org.apache.activemq.network.jms.JmsConnector;
091 import org.apache.activemq.proxy.ProxyConnector;
092 import org.apache.activemq.security.MessageAuthorizationPolicy;
093 import org.apache.activemq.selector.SelectorParser;
094 import org.apache.activemq.store.PersistenceAdapter;
095 import org.apache.activemq.store.PersistenceAdapterFactory;
096 import org.apache.activemq.store.amq.AMQPersistenceAdapter;
097 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
098 import org.apache.activemq.store.kahadb.plist.PListStore;
099 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
100 import org.apache.activemq.thread.Scheduler;
101 import org.apache.activemq.thread.TaskRunnerFactory;
102 import org.apache.activemq.transport.TransportFactory;
103 import org.apache.activemq.transport.TransportServer;
104 import org.apache.activemq.transport.stomp.ProtocolConverter;
105 import org.apache.activemq.transport.vm.VMTransportFactory;
106 import org.apache.activemq.usage.SystemUsage;
107 import org.apache.activemq.util.BrokerSupport;
108 import org.apache.activemq.util.DefaultIOExceptionHandler;
109 import org.apache.activemq.util.IOExceptionHandler;
110 import org.apache.activemq.util.IOExceptionSupport;
111 import org.apache.activemq.util.IOHelper;
112 import org.apache.activemq.util.InetAddressUtil;
113 import org.apache.activemq.util.JMXSupport;
114 import org.apache.activemq.util.ServiceStopper;
115 import org.apache.activemq.util.ThreadPoolUtils;
116 import org.apache.activemq.util.TimeUtils;
117 import org.apache.activemq.util.URISupport;
118 import org.slf4j.Logger;
119 import org.slf4j.LoggerFactory;
120 import org.slf4j.MDC;
121
122 /**
123 * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
124 * number of transport connectors, network connectors and a bunch of properties
125 * which can be used to configure the broker as its lazily created.
126 *
127 *
128 * @org.apache.xbean.XBean
129 */
130 public class BrokerService implements Service {
131 protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
132 public static final String DEFAULT_PORT = "61616";
133 public static final String LOCAL_HOST_NAME;
134 public static final String BROKER_VERSION;
135 public static final String DEFAULT_BROKER_NAME = "localhost";
136 private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
137 private static final long serialVersionUID = 7353129142305630237L;
138 private boolean useJmx = true;
139 private boolean enableStatistics = true;
140 private boolean persistent = true;
141 private boolean populateJMSXUserID;
142 private boolean useAuthenticatedPrincipalForJMSXUserID;
143 private boolean populateUserNameInMBeans;
144 private long mbeanInvocationTimeout = 0;
145
146 private boolean useShutdownHook = true;
147 private boolean useLoggingForShutdownErrors;
148 private boolean shutdownOnMasterFailure;
149 private boolean shutdownOnSlaveFailure;
150 private boolean waitForSlave;
151 private long waitForSlaveTimeout = 600000L;
152 private boolean passiveSlave;
153 private String brokerName = DEFAULT_BROKER_NAME;
154 private File dataDirectoryFile;
155 private File tmpDataDirectory;
156 private Broker broker;
157 private BrokerView adminView;
158 private ManagementContext managementContext;
159 private ObjectName brokerObjectName;
160 private TaskRunnerFactory taskRunnerFactory;
161 private TaskRunnerFactory persistenceTaskRunnerFactory;
162 private SystemUsage systemUsage;
163 private SystemUsage producerSystemUsage;
164 private SystemUsage consumerSystemUsaage;
165 private PersistenceAdapter persistenceAdapter;
166 private PersistenceAdapterFactory persistenceFactory;
167 protected DestinationFactory destinationFactory;
168 private MessageAuthorizationPolicy messageAuthorizationPolicy;
169 private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
170 private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
171 private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
172 private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
173 private final List<Service> services = new ArrayList<Service>();
174 private MasterConnector masterConnector;
175 private String masterConnectorURI;
176 private transient Thread shutdownHook;
177 private String[] transportConnectorURIs;
178 private String[] networkConnectorURIs;
179 private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
180 // to other jms messaging
181 // systems
182 private boolean deleteAllMessagesOnStartup;
183 private boolean advisorySupport = true;
184 private URI vmConnectorURI;
185 private String defaultSocketURIString;
186 private PolicyMap destinationPolicy;
187 private final AtomicBoolean started = new AtomicBoolean(false);
188 private final AtomicBoolean stopped = new AtomicBoolean(false);
189 private final AtomicBoolean stopping = new AtomicBoolean(false);
190 private BrokerPlugin[] plugins;
191 private boolean keepDurableSubsActive = true;
192 private boolean useVirtualTopics = true;
193 private boolean useMirroredQueues = false;
194 private boolean useTempMirroredQueues = true;
195 private BrokerId brokerId;
196 private DestinationInterceptor[] destinationInterceptors;
197 private ActiveMQDestination[] destinations;
198 private PListStore tempDataStore;
199 private int persistenceThreadPriority = Thread.MAX_PRIORITY;
200 private boolean useLocalHostBrokerName;
201 private final CountDownLatch stoppedLatch = new CountDownLatch(1);
202 private final CountDownLatch startedLatch = new CountDownLatch(1);
203 private boolean supportFailOver;
204 private Broker regionBroker;
205 private int producerSystemUsagePortion = 60;
206 private int consumerSystemUsagePortion = 40;
207 private boolean splitSystemUsageForProducersConsumers;
208 private boolean monitorConnectionSplits = false;
209 private int taskRunnerPriority = Thread.NORM_PRIORITY;
210 private boolean dedicatedTaskRunner;
211 private boolean cacheTempDestinations = false;// useful for failover
212 private int timeBeforePurgeTempDestinations = 5000;
213 private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
214 private boolean systemExitOnShutdown;
215 private int systemExitOnShutdownExitCode;
216 private SslContext sslContext;
217 private boolean forceStart = false;
218 private IOExceptionHandler ioExceptionHandler;
219 private boolean schedulerSupport = false;
220 private File schedulerDirectoryFile;
221 private Scheduler scheduler;
222 private ThreadPoolExecutor executor;
223 private boolean slave = true;
224 private int schedulePeriodForDestinationPurge= 0;
225 private int maxPurgedDestinationsPerSweep = 0;
226 private BrokerContext brokerContext;
227 private boolean networkConnectorStartAsync = false;
228 private boolean allowTempAutoCreationOnSend;
229
230 private int offlineDurableSubscriberTimeout = -1;
231 private int offlineDurableSubscriberTaskSchedule = 300000;
232 private DestinationFilter virtualConsumerDestinationFilter;
233
234 private final Object persistenceAdapterLock = new Object();
235 private Throwable startException = null;
236 private boolean startAsync = false;
237 private Date startDate;
238
239 static {
240 String localHostName = "localhost";
241 try {
242 localHostName = InetAddressUtil.getLocalHostName();
243 } catch (UnknownHostException e) {
244 LOG.error("Failed to resolve localhost");
245 }
246 LOCAL_HOST_NAME = localHostName;
247
248 InputStream in = null;
249 String version = null;
250 if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
251 BufferedReader reader = new BufferedReader(new InputStreamReader(in));
252 try {
253 version = reader.readLine();
254 } catch(Exception e) {
255 }
256 }
257 BROKER_VERSION = version;
258 }
259
260 @Override
261 public String toString() {
262 return "BrokerService[" + getBrokerName() + "]";
263 }
264
265 private String getBrokerVersion() {
266 String version = ActiveMQConnectionMetaData.PROVIDER_VERSION;
267 if (version == null) {
268 version = BROKER_VERSION;
269 }
270
271 return version;
272 }
273
274 /**
275 * Adds a new transport connector for the given bind address
276 *
277 * @return the newly created and added transport connector
278 * @throws Exception
279 */
280 public TransportConnector addConnector(String bindAddress) throws Exception {
281 return addConnector(new URI(bindAddress));
282 }
283
284 /**
285 * Adds a new transport connector for the given bind address
286 *
287 * @return the newly created and added transport connector
288 * @throws Exception
289 */
290 public TransportConnector addConnector(URI bindAddress) throws Exception {
291 return addConnector(createTransportConnector(bindAddress));
292 }
293
294 /**
295 * Adds a new transport connector for the given TransportServer transport
296 *
297 * @return the newly created and added transport connector
298 * @throws Exception
299 */
300 public TransportConnector addConnector(TransportServer transport) throws Exception {
301 return addConnector(new TransportConnector(transport));
302 }
303
304 /**
305 * Adds a new transport connector
306 *
307 * @return the transport connector
308 * @throws Exception
309 */
310 public TransportConnector addConnector(TransportConnector connector) throws Exception {
311 transportConnectors.add(connector);
312 return connector;
313 }
314
315 /**
316 * Stops and removes a transport connector from the broker.
317 *
318 * @param connector
319 * @return true if the connector has been previously added to the broker
320 * @throws Exception
321 */
322 public boolean removeConnector(TransportConnector connector) throws Exception {
323 boolean rc = transportConnectors.remove(connector);
324 if (rc) {
325 unregisterConnectorMBean(connector);
326 }
327 return rc;
328 }
329
330 /**
331 * Adds a new network connector using the given discovery address
332 *
333 * @return the newly created and added network connector
334 * @throws Exception
335 */
336 public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
337 return addNetworkConnector(new URI(discoveryAddress));
338 }
339
340 /**
341 * Adds a new proxy connector using the given bind address
342 *
343 * @return the newly created and added network connector
344 * @throws Exception
345 */
346 public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
347 return addProxyConnector(new URI(bindAddress));
348 }
349
350 /**
351 * Adds a new network connector using the given discovery address
352 *
353 * @return the newly created and added network connector
354 * @throws Exception
355 */
356 public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
357 NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
358 return addNetworkConnector(connector);
359 }
360
361 /**
362 * Adds a new proxy connector using the given bind address
363 *
364 * @return the newly created and added network connector
365 * @throws Exception
366 */
367 public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
368 ProxyConnector connector = new ProxyConnector();
369 connector.setBind(bindAddress);
370 connector.setRemote(new URI("fanout:multicast://default"));
371 return addProxyConnector(connector);
372 }
373
374 /**
375 * Adds a new network connector to connect this broker to a federated
376 * network
377 */
378 public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
379 connector.setBrokerService(this);
380 URI uri = getVmConnectorURI();
381 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
382 map.put("network", "true");
383 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
384 connector.setLocalUri(uri);
385 // Set a connection filter so that the connector does not establish loop
386 // back connections.
387 connector.setConnectionFilter(new ConnectionFilter() {
388 public boolean connectTo(URI location) {
389 List<TransportConnector> transportConnectors = getTransportConnectors();
390 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
391 try {
392 TransportConnector tc = iter.next();
393 if (location.equals(tc.getConnectUri())) {
394 return false;
395 }
396 } catch (Throwable e) {
397 }
398 }
399 return true;
400 }
401 });
402 networkConnectors.add(connector);
403 if (isUseJmx()) {
404 registerNetworkConnectorMBean(connector);
405 }
406 return connector;
407 }
408
409 /**
410 * Removes the given network connector without stopping it. The caller
411 * should call {@link NetworkConnector#stop()} to close the connector
412 */
413 public boolean removeNetworkConnector(NetworkConnector connector) {
414 boolean answer = networkConnectors.remove(connector);
415 if (answer) {
416 unregisterNetworkConnectorMBean(connector);
417 }
418 return answer;
419 }
420
421 public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
422 URI uri = getVmConnectorURI();
423 connector.setLocalUri(uri);
424 proxyConnectors.add(connector);
425 if (isUseJmx()) {
426 registerProxyConnectorMBean(connector);
427 }
428 return connector;
429 }
430
431 public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
432 connector.setBrokerService(this);
433 jmsConnectors.add(connector);
434 if (isUseJmx()) {
435 registerJmsConnectorMBean(connector);
436 }
437 return connector;
438 }
439
440 public JmsConnector removeJmsConnector(JmsConnector connector) {
441 if (jmsConnectors.remove(connector)) {
442 return connector;
443 }
444 return null;
445 }
446
447 /**
448 * @return Returns the masterConnectorURI.
449 */
450 public String getMasterConnectorURI() {
451 return masterConnectorURI;
452 }
453
454 /**
455 * @param masterConnectorURI
456 * The masterConnectorURI to set.
457 */
458 public void setMasterConnectorURI(String masterConnectorURI) {
459 this.masterConnectorURI = masterConnectorURI;
460 }
461
462 /**
463 * @return true if this Broker is a slave to a Master
464 */
465 public boolean isSlave() {
466 return (masterConnector != null && masterConnector.isSlave()) ||
467 (masterConnector != null && masterConnector.isStoppedBeforeStart()) ||
468 (masterConnector == null && slave);
469 }
470
471 public void masterFailed() {
472 if (shutdownOnMasterFailure) {
473 LOG.error("The Master has failed ... shutting down");
474 try {
475 stop();
476 } catch (Exception e) {
477 LOG.error("Failed to stop for master failure", e);
478 }
479 } else {
480 LOG.warn("Master Failed - starting all connectors");
481 try {
482 startAllConnectors();
483 broker.nowMasterBroker();
484 } catch (Exception e) {
485 LOG.error("Failed to startAllConnectors", e);
486 }
487 }
488 }
489
490 public String getUptime() {
491 // compute and log uptime
492 if (startDate == null) {
493 return "not started";
494 }
495 long delta = new Date().getTime() - startDate.getTime();
496 return TimeUtils.printDuration(delta);
497 }
498
499 public boolean isStarted() {
500 return started.get();
501 }
502
503 /**
504 * Forces a start of the broker.
505 * By default a BrokerService instance that was
506 * previously stopped using BrokerService.stop() cannot be restarted
507 * using BrokerService.start().
508 * This method enforces a restart.
509 * It is not recommended to force a restart of the broker and will not work
510 * for most but some very trivial broker configurations.
511 * For restarting a broker instance we recommend to first call stop() on
512 * the old instance and then recreate a new BrokerService instance.
513 *
514 * @param force - if true enforces a restart.
515 * @throws Exception
516 */
517 public void start(boolean force) throws Exception {
518 forceStart = force;
519 stopped.set(false);
520 started.set(false);
521 start();
522 }
523
524 // Service interface
525 // -------------------------------------------------------------------------
526
527 protected boolean shouldAutostart() {
528 return true;
529 }
530
531 /**
532 *
533 * @throws Exception
534 * @org. apache.xbean.InitMethod
535 */
536 @PostConstruct
537 public void autoStart() throws Exception {
538 if(shouldAutostart()) {
539 start();
540 }
541 }
542
543 public void start() throws Exception {
544 if (stopped.get() || !started.compareAndSet(false, true)) {
545 // lets just ignore redundant start() calls
546 // as its way too easy to not be completely sure if start() has been
547 // called or not with the gazillion of different configuration
548 // mechanisms
549 // throw new IllegalStateException("Already started.");
550 return;
551 }
552
553 stopping.set(false);
554 startDate = new Date();
555 MDC.put("activemq.broker", brokerName);
556
557 try {
558 if (systemExitOnShutdown && useShutdownHook) {
559 throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
560 }
561 processHelperProperties();
562 if (isUseJmx()) {
563 // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and
564 // we cannot cleanup clear that during shutdown of the broker.
565 MDC.remove("activemq.broker");
566 try {
567 startManagementContext();
568 } finally {
569 MDC.put("activemq.broker", brokerName);
570 }
571 }
572 // in jvm master slave, lets not publish over existing broker till we get the lock
573 final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance();
574 if (brokerRegistry.lookup(getBrokerName()) == null) {
575 brokerRegistry.bind(getBrokerName(), BrokerService.this);
576 }
577 startPersistenceAdapter(startAsync);
578 startBroker(startAsync);
579 brokerRegistry.bind(getBrokerName(), BrokerService.this);
580 } catch (Exception e) {
581 LOG.error("Failed to start Apache ActiveMQ (" + getBrokerName() + ", " + brokerId + "). Reason: " + e, e);
582 try {
583 if (!stopped.get()) {
584 stop();
585 }
586 } catch (Exception ex) {
587 LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex);
588 }
589 throw e;
590 } finally {
591 MDC.remove("activemq.broker");
592 }
593 }
594
595 private void startPersistenceAdapter(boolean async) throws Exception {
596 if (async) {
597 new Thread("Persistence Adapter Starting Thread") {
598 @Override
599 public void run() {
600 try {
601 doStartPersistenceAdapter();
602 } catch (Throwable e) {
603 startException = e;
604 } finally {
605 synchronized (persistenceAdapterLock) {
606 persistenceAdapterLock.notifyAll();
607 }
608 }
609 }
610 }.start();
611 } else {
612 doStartPersistenceAdapter();
613 }
614 }
615
616 private void doStartPersistenceAdapter() throws Exception {
617 getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
618 getPersistenceAdapter().setBrokerName(getBrokerName());
619 LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
620 if (deleteAllMessagesOnStartup) {
621 deleteAllMessages();
622 }
623 getPersistenceAdapter().start();
624 }
625
626 private void startBroker(boolean async) throws Exception {
627 if (async) {
628 new Thread("Broker Starting Thread") {
629 @Override
630 public void run() {
631 try {
632 synchronized (persistenceAdapterLock) {
633 persistenceAdapterLock.wait();
634 }
635 doStartBroker();
636 } catch (Throwable t) {
637 startException = t;
638 }
639 }
640 }.start();
641 } else {
642 doStartBroker();
643 }
644 }
645
646 private void doStartBroker() throws Exception {
647 if (startException != null) {
648 return;
649 }
650 slave = false;
651 startDestinations();
652 addShutdownHook();
653
654 broker = getBroker();
655 brokerId = broker.getBrokerId();
656
657 // need to log this after creating the broker so we have its id and name
658 if (LOG.isInfoEnabled()) {
659 LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
660 + getBrokerName() + ", " + brokerId + ") is starting");
661 }
662 broker.start();
663
664 if (isUseJmx()) {
665 if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
666 // try to restart management context
667 // typical for slaves that use the same ports as master
668 managementContext.stop();
669 startManagementContext();
670 }
671 ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
672 managedBroker.setContextBroker(broker);
673 adminView.setBroker(managedBroker);
674 }
675 // see if there is a MasterBroker service and if so, configure
676 // it and start it.
677 for (Service service : services) {
678 if (service instanceof MasterConnector) {
679 configureService(service);
680 service.start();
681 }
682 }
683 if (!isSlave() && (masterConnector == null || isShutdownOnMasterFailure() == false)) {
684 startAllConnectors();
685 }
686 if (!stopped.get()) {
687 if (isUseJmx() && masterConnector != null) {
688 registerFTConnectorMBean(masterConnector);
689 }
690 }
691 if (ioExceptionHandler == null) {
692 setIoExceptionHandler(new DefaultIOExceptionHandler());
693 }
694
695 if (LOG.isInfoEnabled()) {
696 LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
697 + getBrokerName() + ", " + brokerId + ") started");
698 LOG.info("For help or more information please see: http://activemq.apache.org");
699 }
700
701 getBroker().brokerServiceStarted();
702 checkSystemUsageLimits();
703 startedLatch.countDown();
704 }
705
706 /**
707 *
708 * @throws Exception
709 * @org.apache .xbean.DestroyMethod
710 */
711 @PreDestroy
712 public void stop() throws Exception {
713 if (!stopping.compareAndSet(false, true)) {
714 LOG.trace("Broker already stopping/stopped");
715 return;
716 }
717
718 MDC.put("activemq.broker", brokerName);
719
720 if (systemExitOnShutdown) {
721 new Thread() {
722 @Override
723 public void run() {
724 System.exit(systemExitOnShutdownExitCode);
725 }
726 }.start();
727 }
728
729 if (LOG.isInfoEnabled()) {
730 LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
731 + getBrokerName() + ", " + brokerId + ") is shutting down");
732 }
733
734 removeShutdownHook();
735 if (this.scheduler != null) {
736 this.scheduler.stop();
737 this.scheduler = null;
738 }
739 ServiceStopper stopper = new ServiceStopper();
740 if (services != null) {
741 for (Service service : services) {
742 stopper.stop(service);
743 }
744 }
745 stopAllConnectors(stopper);
746 // remove any VMTransports connected
747 // this has to be done after services are stopped,
748 // to avoid timing issue with discovery (spinning up a new instance)
749 BrokerRegistry.getInstance().unbind(getBrokerName());
750 VMTransportFactory.stopped(getBrokerName());
751 if (broker != null) {
752 stopper.stop(broker);
753 broker = null;
754 }
755
756 if (tempDataStore != null) {
757 tempDataStore.stop();
758 tempDataStore = null;
759 }
760 try {
761 stopper.stop(persistenceAdapter);
762 persistenceAdapter = null;
763 slave = true;
764 if (isUseJmx()) {
765 stopper.stop(getManagementContext());
766 managementContext = null;
767 }
768 // Clear SelectorParser cache to free memory
769 SelectorParser.clearCache();
770 } finally {
771 stopped.set(true);
772 stoppedLatch.countDown();
773 }
774 if (masterConnectorURI == null) {
775 // master start has not finished yet
776 if (slaveStartSignal.getCount() == 1) {
777 started.set(false);
778 slaveStartSignal.countDown();
779 }
780 } else {
781 for (Service service : services) {
782 if (service instanceof MasterConnector) {
783 MasterConnector mConnector = (MasterConnector) service;
784 if (!mConnector.isSlave()) {
785 // means should be slave but not connected to master yet
786 started.set(false);
787 mConnector.stopBeforeConnected();
788 }
789 }
790 }
791 }
792 if (this.taskRunnerFactory != null) {
793 this.taskRunnerFactory.shutdown();
794 this.taskRunnerFactory = null;
795 }
796 if (this.executor != null) {
797 ThreadPoolUtils.shutdownNow(executor);
798 this.executor = null;
799 }
800
801 this.destinationInterceptors = null;
802 this.destinationFactory = null;
803
804 if (LOG.isInfoEnabled()) {
805 if (startDate != null) {
806 LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
807 + getBrokerName() + ", " + brokerId + ") uptime " + getUptime());
808 }
809 LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
810 + getBrokerName() + ", " + brokerId + ") is shutdown");
811 }
812
813 synchronized (shutdownHooks) {
814 for (Runnable hook : shutdownHooks) {
815 try {
816 hook.run();
817 } catch (Throwable e) {
818 stopper.onException(hook, e);
819 }
820 }
821 }
822
823 MDC.remove("activemq.broker");
824
825 // and clear start date
826 startDate = null;
827
828 stopper.throwFirstException();
829 }
830
831 public boolean checkQueueSize(String queueName) {
832 long count = 0;
833 long queueSize = 0;
834 Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
835 for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
836 if (entry.getKey().isQueue()) {
837 if (entry.getValue().getName().matches(queueName)) {
838 queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
839 count += queueSize;
840 if (queueSize > 0) {
841 LOG.info("Queue has pending message:" + entry.getValue().getName() + " queueSize is:"
842 + queueSize);
843 }
844 }
845 }
846 }
847 return count == 0;
848 }
849
850 /**
851 * This method (both connectorName and queueName are using regex to match)
852 * 1. stop the connector (supposed the user input the connector which the
853 * clients connect to) 2. to check whether there is any pending message on
854 * the queues defined by queueName 3. supposedly, after stop the connector,
855 * client should failover to other broker and pending messages should be
856 * forwarded. if no pending messages, the method finally call stop to stop
857 * the broker.
858 *
859 * @param connectorName
860 * @param queueName
861 * @param timeout
862 * @param pollInterval
863 * @throws Exception
864 */
865 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
866 throws Exception {
867 if (isUseJmx()) {
868 if (connectorName == null || queueName == null || timeout <= 0) {
869 throw new Exception(
870 "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
871 }
872 if (pollInterval <= 0) {
873 pollInterval = 30;
874 }
875 LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:" + queueName + " timeout:"
876 + timeout + " pollInterval:" + pollInterval);
877 TransportConnector connector;
878 for (int i = 0; i < transportConnectors.size(); i++) {
879 connector = transportConnectors.get(i);
880 if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
881 connector.stop();
882 }
883 }
884 long start = System.currentTimeMillis();
885 while (System.currentTimeMillis() - start < timeout * 1000) {
886 // check quesize until it gets zero
887 if (checkQueueSize(queueName)) {
888 stop();
889 break;
890 } else {
891 Thread.sleep(pollInterval * 1000);
892 }
893 }
894 if (stopped.get()) {
895 LOG.info("Successfully stop the broker.");
896 } else {
897 LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
898 }
899 }
900 }
901
902 /**
903 * A helper method to block the caller thread until the broker has been
904 * stopped
905 */
906 public void waitUntilStopped() {
907 while (isStarted() && !stopped.get()) {
908 try {
909 stoppedLatch.await();
910 } catch (InterruptedException e) {
911 // ignore
912 }
913 }
914 }
915
916 /**
917 * A helper method to block the caller thread until the broker has fully started
918 * @return boolean true if wait succeeded false if broker was not started or was stopped
919 */
920 public boolean waitUntilStarted() {
921 boolean waitSucceeded = false;
922 while (isStarted() && !stopped.get() && !waitSucceeded) {
923 try {
924 if (startException != null) {
925 return waitSucceeded;
926 }
927 waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
928 } catch (InterruptedException ignore) {
929 }
930 }
931 return waitSucceeded;
932 }
933
934 // Properties
935 // -------------------------------------------------------------------------
936 /**
937 * Returns the message broker
938 */
939 public Broker getBroker() throws Exception {
940 if (broker == null) {
941 broker = createBroker();
942 }
943 return broker;
944 }
945
946 /**
947 * Returns the administration view of the broker; used to create and destroy
948 * resources such as queues and topics. Note this method returns null if JMX
949 * is disabled.
950 */
951 public BrokerView getAdminView() throws Exception {
952 if (adminView == null) {
953 // force lazy creation
954 getBroker();
955 }
956 return adminView;
957 }
958
959 public void setAdminView(BrokerView adminView) {
960 this.adminView = adminView;
961 }
962
963 public String getBrokerName() {
964 return brokerName;
965 }
966
967 /**
968 * Sets the name of this broker; which must be unique in the network
969 *
970 * @param brokerName
971 */
972 public void setBrokerName(String brokerName) {
973 if (brokerName == null) {
974 throw new NullPointerException("The broker name cannot be null");
975 }
976 String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
977 if (!str.equals(brokerName)) {
978 LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str);
979 }
980 this.brokerName = str.trim();
981 }
982
983 public PersistenceAdapterFactory getPersistenceFactory() {
984 return persistenceFactory;
985 }
986
987 public File getDataDirectoryFile() {
988 if (dataDirectoryFile == null) {
989 dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
990 }
991 return dataDirectoryFile;
992 }
993
994 public File getBrokerDataDirectory() {
995 String brokerDir = getBrokerName();
996 return new File(getDataDirectoryFile(), brokerDir);
997 }
998
999 /**
1000 * Sets the directory in which the data files will be stored by default for
1001 * the JDBC and Journal persistence adaptors.
1002 *
1003 * @param dataDirectory
1004 * the directory to store data files
1005 */
1006 public void setDataDirectory(String dataDirectory) {
1007 setDataDirectoryFile(new File(dataDirectory));
1008 }
1009
1010 /**
1011 * Sets the directory in which the data files will be stored by default for
1012 * the JDBC and Journal persistence adaptors.
1013 *
1014 * @param dataDirectoryFile
1015 * the directory to store data files
1016 */
1017 public void setDataDirectoryFile(File dataDirectoryFile) {
1018 this.dataDirectoryFile = dataDirectoryFile;
1019 }
1020
1021 /**
1022 * @return the tmpDataDirectory
1023 */
1024 public File getTmpDataDirectory() {
1025 if (tmpDataDirectory == null) {
1026 tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
1027 }
1028 return tmpDataDirectory;
1029 }
1030
1031 /**
1032 * @param tmpDataDirectory
1033 * the tmpDataDirectory to set
1034 */
1035 public void setTmpDataDirectory(File tmpDataDirectory) {
1036 this.tmpDataDirectory = tmpDataDirectory;
1037 }
1038
1039 public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
1040 this.persistenceFactory = persistenceFactory;
1041 }
1042
1043 public void setDestinationFactory(DestinationFactory destinationFactory) {
1044 this.destinationFactory = destinationFactory;
1045 }
1046
1047 public boolean isPersistent() {
1048 return persistent;
1049 }
1050
1051 /**
1052 * Sets whether or not persistence is enabled or disabled.
1053 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1054 */
1055 public void setPersistent(boolean persistent) {
1056 this.persistent = persistent;
1057 }
1058
1059 public boolean isPopulateJMSXUserID() {
1060 return populateJMSXUserID;
1061 }
1062
1063 /**
1064 * Sets whether or not the broker should populate the JMSXUserID header.
1065 */
1066 public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
1067 this.populateJMSXUserID = populateJMSXUserID;
1068 }
1069
1070 public SystemUsage getSystemUsage() {
1071 try {
1072 if (systemUsage == null) {
1073 systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
1074 systemUsage.setExecutor(getExecutor());
1075 systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // 64 MB
1076 systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1077 systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
1078 addService(this.systemUsage);
1079 }
1080 return systemUsage;
1081 } catch (IOException e) {
1082 LOG.error("Cannot create SystemUsage", e);
1083 throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
1084 }
1085 }
1086
1087 public void setSystemUsage(SystemUsage memoryManager) {
1088 if (this.systemUsage != null) {
1089 removeService(this.systemUsage);
1090 }
1091 this.systemUsage = memoryManager;
1092 if (this.systemUsage.getExecutor()==null) {
1093 this.systemUsage.setExecutor(getExecutor());
1094 }
1095 addService(this.systemUsage);
1096 }
1097
1098 /**
1099 * @return the consumerUsageManager
1100 * @throws IOException
1101 */
1102 public SystemUsage getConsumerSystemUsage() throws IOException {
1103 if (this.consumerSystemUsaage == null) {
1104 if (splitSystemUsageForProducersConsumers) {
1105 this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
1106 float portion = consumerSystemUsagePortion / 100f;
1107 this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
1108 addService(this.consumerSystemUsaage);
1109 } else {
1110 consumerSystemUsaage = getSystemUsage();
1111 }
1112 }
1113 return this.consumerSystemUsaage;
1114 }
1115
1116 /**
1117 * @param consumerSystemUsaage
1118 * the storeSystemUsage to set
1119 */
1120 public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
1121 if (this.consumerSystemUsaage != null) {
1122 removeService(this.consumerSystemUsaage);
1123 }
1124 this.consumerSystemUsaage = consumerSystemUsaage;
1125 addService(this.consumerSystemUsaage);
1126 }
1127
1128 /**
1129 * @return the producerUsageManager
1130 * @throws IOException
1131 */
1132 public SystemUsage getProducerSystemUsage() throws IOException {
1133 if (producerSystemUsage == null) {
1134 if (splitSystemUsageForProducersConsumers) {
1135 producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
1136 float portion = producerSystemUsagePortion / 100f;
1137 producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
1138 addService(producerSystemUsage);
1139 } else {
1140 producerSystemUsage = getSystemUsage();
1141 }
1142 }
1143 return producerSystemUsage;
1144 }
1145
1146 /**
1147 * @param producerUsageManager
1148 * the producerUsageManager to set
1149 */
1150 public void setProducerSystemUsage(SystemUsage producerUsageManager) {
1151 if (this.producerSystemUsage != null) {
1152 removeService(this.producerSystemUsage);
1153 }
1154 this.producerSystemUsage = producerUsageManager;
1155 addService(this.producerSystemUsage);
1156 }
1157
1158 public PersistenceAdapter getPersistenceAdapter() throws IOException {
1159 if (persistenceAdapter == null) {
1160 persistenceAdapter = createPersistenceAdapter();
1161 configureService(persistenceAdapter);
1162 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1163 }
1164 return persistenceAdapter;
1165 }
1166
1167 /**
1168 * Sets the persistence adaptor implementation to use for this broker
1169 *
1170 * @throws IOException
1171 */
1172 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1173 if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) {
1174 LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: " + persistenceAdapter);
1175 return;
1176 }
1177 this.persistenceAdapter = persistenceAdapter;
1178 configureService(this.persistenceAdapter);
1179 this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1180 }
1181
1182 public TaskRunnerFactory getTaskRunnerFactory() {
1183 if (this.taskRunnerFactory == null) {
1184 this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1185 isDedicatedTaskRunner());
1186 }
1187 return this.taskRunnerFactory;
1188 }
1189
1190 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1191 this.taskRunnerFactory = taskRunnerFactory;
1192 }
1193
1194 public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1195 if (taskRunnerFactory == null) {
1196 persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1197 true, 1000, isDedicatedTaskRunner());
1198 }
1199 return persistenceTaskRunnerFactory;
1200 }
1201
1202 public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1203 this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1204 }
1205
1206 public boolean isUseJmx() {
1207 return useJmx;
1208 }
1209
1210 public boolean isEnableStatistics() {
1211 return enableStatistics;
1212 }
1213
1214 /**
1215 * Sets whether or not the Broker's services enable statistics or not.
1216 */
1217 public void setEnableStatistics(boolean enableStatistics) {
1218 this.enableStatistics = enableStatistics;
1219 }
1220
1221 /**
1222 * Sets whether or not the Broker's services should be exposed into JMX or
1223 * not.
1224 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1225 */
1226 public void setUseJmx(boolean useJmx) {
1227 this.useJmx = useJmx;
1228 }
1229
1230 public ObjectName getBrokerObjectName() throws IOException {
1231 if (brokerObjectName == null) {
1232 brokerObjectName = createBrokerObjectName();
1233 }
1234 return brokerObjectName;
1235 }
1236
1237 /**
1238 * Sets the JMX ObjectName for this broker
1239 */
1240 public void setBrokerObjectName(ObjectName brokerObjectName) {
1241 this.brokerObjectName = brokerObjectName;
1242 }
1243
1244 public ManagementContext getManagementContext() {
1245 if (managementContext == null) {
1246 managementContext = new ManagementContext();
1247 }
1248 return managementContext;
1249 }
1250
1251 public void setManagementContext(ManagementContext managementContext) {
1252 this.managementContext = managementContext;
1253 }
1254
1255 public NetworkConnector getNetworkConnectorByName(String connectorName) {
1256 for (NetworkConnector connector : networkConnectors) {
1257 if (connector.getName().equals(connectorName)) {
1258 return connector;
1259 }
1260 }
1261 return null;
1262 }
1263
1264 public String[] getNetworkConnectorURIs() {
1265 return networkConnectorURIs;
1266 }
1267
1268 public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1269 this.networkConnectorURIs = networkConnectorURIs;
1270 }
1271
1272 public TransportConnector getConnectorByName(String connectorName) {
1273 for (TransportConnector connector : transportConnectors) {
1274 if (connector.getName().equals(connectorName)) {
1275 return connector;
1276 }
1277 }
1278 return null;
1279 }
1280
1281 public Map<String, String> getTransportConnectorURIsAsMap() {
1282 Map<String, String> answer = new HashMap<String, String>();
1283 for (TransportConnector connector : transportConnectors) {
1284 try {
1285 URI uri = connector.getConnectUri();
1286 if (uri != null) {
1287 String scheme = uri.getScheme();
1288 if (scheme != null) {
1289 answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString());
1290 }
1291 }
1292 } catch (Exception e) {
1293 LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1294 }
1295 }
1296 return answer;
1297 }
1298
1299 public String[] getTransportConnectorURIs() {
1300 return transportConnectorURIs;
1301 }
1302
1303 public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1304 this.transportConnectorURIs = transportConnectorURIs;
1305 }
1306
1307 /**
1308 * @return Returns the jmsBridgeConnectors.
1309 */
1310 public JmsConnector[] getJmsBridgeConnectors() {
1311 return jmsBridgeConnectors;
1312 }
1313
1314 /**
1315 * @param jmsConnectors
1316 * The jmsBridgeConnectors to set.
1317 */
1318 public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1319 this.jmsBridgeConnectors = jmsConnectors;
1320 }
1321
1322 public Service[] getServices() {
1323 return services.toArray(new Service[0]);
1324 }
1325
1326 /**
1327 * Sets the services associated with this broker such as a
1328 * {@link MasterConnector}
1329 */
1330 public void setServices(Service[] services) {
1331 this.services.clear();
1332 if (services != null) {
1333 for (int i = 0; i < services.length; i++) {
1334 this.services.add(services[i]);
1335 }
1336 }
1337 }
1338
1339 /**
1340 * Adds a new service so that it will be started as part of the broker
1341 * lifecycle
1342 */
1343 public void addService(Service service) {
1344 services.add(service);
1345 }
1346
1347 public void removeService(Service service) {
1348 services.remove(service);
1349 }
1350
1351 public boolean isUseLoggingForShutdownErrors() {
1352 return useLoggingForShutdownErrors;
1353 }
1354
1355 /**
1356 * Sets whether or not we should use commons-logging when reporting errors
1357 * when shutting down the broker
1358 */
1359 public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1360 this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1361 }
1362
1363 public boolean isUseShutdownHook() {
1364 return useShutdownHook;
1365 }
1366
1367 /**
1368 * Sets whether or not we should use a shutdown handler to close down the
1369 * broker cleanly if the JVM is terminated. It is recommended you leave this
1370 * enabled.
1371 */
1372 public void setUseShutdownHook(boolean useShutdownHook) {
1373 this.useShutdownHook = useShutdownHook;
1374 }
1375
1376 public boolean isAdvisorySupport() {
1377 return advisorySupport;
1378 }
1379
1380 /**
1381 * Allows the support of advisory messages to be disabled for performance
1382 * reasons.
1383 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1384 */
1385 public void setAdvisorySupport(boolean advisorySupport) {
1386 this.advisorySupport = advisorySupport;
1387 }
1388
1389 public List<TransportConnector> getTransportConnectors() {
1390 return new ArrayList<TransportConnector>(transportConnectors);
1391 }
1392
1393 /**
1394 * Sets the transport connectors which this broker will listen on for new
1395 * clients
1396 *
1397 * @org.apache.xbean.Property
1398 * nestedType="org.apache.activemq.broker.TransportConnector"
1399 */
1400 public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1401 for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
1402 TransportConnector connector = iter.next();
1403 addConnector(connector);
1404 }
1405 }
1406
1407 public TransportConnector getTransportConnectorByName(String name){
1408 for (TransportConnector transportConnector:transportConnectors){
1409 if (name.equals(transportConnector.getName())){
1410 return transportConnector;
1411 }
1412 }
1413 return null;
1414 }
1415
1416 public TransportConnector getTransportConnectorByScheme(String scheme){
1417 for (TransportConnector transportConnector:transportConnectors){
1418 if (scheme.equals(transportConnector.getUri().getScheme())){
1419 return transportConnector;
1420 }
1421 }
1422 return null;
1423 }
1424
1425 public List<NetworkConnector> getNetworkConnectors() {
1426 return new ArrayList<NetworkConnector>(networkConnectors);
1427 }
1428
1429 public List<ProxyConnector> getProxyConnectors() {
1430 return new ArrayList<ProxyConnector>(proxyConnectors);
1431 }
1432
1433 /**
1434 * Sets the network connectors which this broker will use to connect to
1435 * other brokers in a federated network
1436 *
1437 * @org.apache.xbean.Property
1438 * nestedType="org.apache.activemq.network.NetworkConnector"
1439 */
1440 public void setNetworkConnectors(List networkConnectors) throws Exception {
1441 for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
1442 NetworkConnector connector = (NetworkConnector) iter.next();
1443 addNetworkConnector(connector);
1444 }
1445 }
1446
1447 /**
1448 * Sets the network connectors which this broker will use to connect to
1449 * other brokers in a federated network
1450 */
1451 public void setProxyConnectors(List proxyConnectors) throws Exception {
1452 for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) {
1453 ProxyConnector connector = (ProxyConnector) iter.next();
1454 addProxyConnector(connector);
1455 }
1456 }
1457
1458 public PolicyMap getDestinationPolicy() {
1459 return destinationPolicy;
1460 }
1461
1462 /**
1463 * Sets the destination specific policies available either for exact
1464 * destinations or for wildcard areas of destinations.
1465 */
1466 public void setDestinationPolicy(PolicyMap policyMap) {
1467 this.destinationPolicy = policyMap;
1468 }
1469
1470 public BrokerPlugin[] getPlugins() {
1471 return plugins;
1472 }
1473
1474 /**
1475 * Sets a number of broker plugins to install such as for security
1476 * authentication or authorization
1477 */
1478 public void setPlugins(BrokerPlugin[] plugins) {
1479 this.plugins = plugins;
1480 }
1481
1482 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1483 return messageAuthorizationPolicy;
1484 }
1485
1486 /**
1487 * Sets the policy used to decide if the current connection is authorized to
1488 * consume a given message
1489 */
1490 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1491 this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1492 }
1493
1494 /**
1495 * Delete all messages from the persistent store
1496 *
1497 * @throws IOException
1498 */
1499 public void deleteAllMessages() throws IOException {
1500 getPersistenceAdapter().deleteAllMessages();
1501 }
1502
1503 public boolean isDeleteAllMessagesOnStartup() {
1504 return deleteAllMessagesOnStartup;
1505 }
1506
1507 /**
1508 * Sets whether or not all messages are deleted on startup - mostly only
1509 * useful for testing.
1510 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1511 */
1512 public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1513 this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1514 }
1515
1516 public URI getVmConnectorURI() {
1517 if (vmConnectorURI == null) {
1518 try {
1519 vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
1520 } catch (URISyntaxException e) {
1521 LOG.error("Badly formed URI from " + getBrokerName(), e);
1522 }
1523 }
1524 return vmConnectorURI;
1525 }
1526
1527 public void setVmConnectorURI(URI vmConnectorURI) {
1528 this.vmConnectorURI = vmConnectorURI;
1529 }
1530
1531 public String getDefaultSocketURIString() {
1532
1533 if (started.get()) {
1534 if (this.defaultSocketURIString == null) {
1535 for (TransportConnector tc:this.transportConnectors) {
1536 String result = null;
1537 try {
1538 result = tc.getPublishableConnectString();
1539 } catch (Exception e) {
1540 LOG.warn("Failed to get the ConnectURI for "+tc,e);
1541 }
1542 if (result != null) {
1543 // find first publishable uri
1544 if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
1545 this.defaultSocketURIString = result;
1546 break;
1547 } else {
1548 // or use the first defined
1549 if (this.defaultSocketURIString == null) {
1550 this.defaultSocketURIString = result;
1551 }
1552 }
1553 }
1554 }
1555
1556 }
1557 return this.defaultSocketURIString;
1558 }
1559 return null;
1560 }
1561
1562 /**
1563 * @return Returns the shutdownOnMasterFailure.
1564 */
1565 public boolean isShutdownOnMasterFailure() {
1566 return shutdownOnMasterFailure;
1567 }
1568
1569 /**
1570 * @param shutdownOnMasterFailure
1571 * The shutdownOnMasterFailure to set.
1572 */
1573 public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1574 this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1575 }
1576
1577 public boolean isKeepDurableSubsActive() {
1578 return keepDurableSubsActive;
1579 }
1580
1581 public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1582 this.keepDurableSubsActive = keepDurableSubsActive;
1583 }
1584
1585 public boolean isUseVirtualTopics() {
1586 return useVirtualTopics;
1587 }
1588
1589 /**
1590 * Sets whether or not <a
1591 * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1592 * Topics</a> should be supported by default if they have not been
1593 * explicitly configured.
1594 */
1595 public void setUseVirtualTopics(boolean useVirtualTopics) {
1596 this.useVirtualTopics = useVirtualTopics;
1597 }
1598
1599 public DestinationInterceptor[] getDestinationInterceptors() {
1600 return destinationInterceptors;
1601 }
1602
1603 public boolean isUseMirroredQueues() {
1604 return useMirroredQueues;
1605 }
1606
1607 /**
1608 * Sets whether or not <a
1609 * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1610 * Queues</a> should be supported by default if they have not been
1611 * explicitly configured.
1612 */
1613 public void setUseMirroredQueues(boolean useMirroredQueues) {
1614 this.useMirroredQueues = useMirroredQueues;
1615 }
1616
1617 /**
1618 * Sets the destination interceptors to use
1619 */
1620 public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1621 this.destinationInterceptors = destinationInterceptors;
1622 }
1623
1624 public ActiveMQDestination[] getDestinations() {
1625 return destinations;
1626 }
1627
1628 /**
1629 * Sets the destinations which should be loaded/created on startup
1630 */
1631 public void setDestinations(ActiveMQDestination[] destinations) {
1632 this.destinations = destinations;
1633 }
1634
1635 /**
1636 * @return the tempDataStore
1637 */
1638 public synchronized PListStore getTempDataStore() {
1639 if (tempDataStore == null) {
1640 if (!isPersistent()) {
1641 return null;
1642 }
1643 boolean result = true;
1644 boolean empty = true;
1645 try {
1646 File directory = getTmpDataDirectory();
1647 if (directory.exists() && directory.isDirectory()) {
1648 File[] files = directory.listFiles();
1649 if (files != null && files.length > 0) {
1650 empty = false;
1651 for (int i = 0; i < files.length; i++) {
1652 File file = files[i];
1653 if (!file.isDirectory()) {
1654 result &= file.delete();
1655 }
1656 }
1657 }
1658 }
1659 if (!empty) {
1660 String str = result ? "Successfully deleted" : "Failed to delete";
1661 LOG.info(str + " temporary storage");
1662 }
1663 this.tempDataStore = new PListStore();
1664 this.tempDataStore.setDirectory(getTmpDataDirectory());
1665 configureService(tempDataStore);
1666 this.tempDataStore.start();
1667 } catch (Exception e) {
1668 throw new RuntimeException(e);
1669 }
1670 }
1671 return tempDataStore;
1672 }
1673
1674 /**
1675 * @param tempDataStore
1676 * the tempDataStore to set
1677 */
1678 public void setTempDataStore(PListStore tempDataStore) {
1679 this.tempDataStore = tempDataStore;
1680 configureService(tempDataStore);
1681 try {
1682 tempDataStore.start();
1683 } catch (Exception e) {
1684 RuntimeException exception = new RuntimeException("Failed to start provided temp data store: " + tempDataStore, e);
1685 LOG.error(exception.getLocalizedMessage(), e);
1686 throw exception;
1687 }
1688 }
1689
1690 public int getPersistenceThreadPriority() {
1691 return persistenceThreadPriority;
1692 }
1693
1694 public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1695 this.persistenceThreadPriority = persistenceThreadPriority;
1696 }
1697
1698 /**
1699 * @return the useLocalHostBrokerName
1700 */
1701 public boolean isUseLocalHostBrokerName() {
1702 return this.useLocalHostBrokerName;
1703 }
1704
1705 /**
1706 * @param useLocalHostBrokerName
1707 * the useLocalHostBrokerName to set
1708 */
1709 public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1710 this.useLocalHostBrokerName = useLocalHostBrokerName;
1711 if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1712 brokerName = LOCAL_HOST_NAME;
1713 }
1714 }
1715
1716 /**
1717 * @return the supportFailOver
1718 */
1719 public boolean isSupportFailOver() {
1720 return this.supportFailOver;
1721 }
1722
1723 /**
1724 * @param supportFailOver
1725 * the supportFailOver to set
1726 */
1727 public void setSupportFailOver(boolean supportFailOver) {
1728 this.supportFailOver = supportFailOver;
1729 }
1730
1731 /**
1732 * Looks up and lazily creates if necessary the destination for the given
1733 * JMS name
1734 */
1735 public Destination getDestination(ActiveMQDestination destination) throws Exception {
1736 return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1737 }
1738
1739 public void removeDestination(ActiveMQDestination destination) throws Exception {
1740 getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1741 }
1742
1743 public int getProducerSystemUsagePortion() {
1744 return producerSystemUsagePortion;
1745 }
1746
1747 public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1748 this.producerSystemUsagePortion = producerSystemUsagePortion;
1749 }
1750
1751 public int getConsumerSystemUsagePortion() {
1752 return consumerSystemUsagePortion;
1753 }
1754
1755 public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1756 this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1757 }
1758
1759 public boolean isSplitSystemUsageForProducersConsumers() {
1760 return splitSystemUsageForProducersConsumers;
1761 }
1762
1763 public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1764 this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1765 }
1766
1767 public boolean isMonitorConnectionSplits() {
1768 return monitorConnectionSplits;
1769 }
1770
1771 public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1772 this.monitorConnectionSplits = monitorConnectionSplits;
1773 }
1774
1775 public int getTaskRunnerPriority() {
1776 return taskRunnerPriority;
1777 }
1778
1779 public void setTaskRunnerPriority(int taskRunnerPriority) {
1780 this.taskRunnerPriority = taskRunnerPriority;
1781 }
1782
1783 public boolean isDedicatedTaskRunner() {
1784 return dedicatedTaskRunner;
1785 }
1786
1787 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1788 this.dedicatedTaskRunner = dedicatedTaskRunner;
1789 }
1790
1791 public boolean isCacheTempDestinations() {
1792 return cacheTempDestinations;
1793 }
1794
1795 public void setCacheTempDestinations(boolean cacheTempDestinations) {
1796 this.cacheTempDestinations = cacheTempDestinations;
1797 }
1798
1799 public int getTimeBeforePurgeTempDestinations() {
1800 return timeBeforePurgeTempDestinations;
1801 }
1802
1803 public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1804 this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1805 }
1806
1807 public boolean isUseTempMirroredQueues() {
1808 return useTempMirroredQueues;
1809 }
1810
1811 public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1812 this.useTempMirroredQueues = useTempMirroredQueues;
1813 }
1814
1815 //
1816 // Implementation methods
1817 // -------------------------------------------------------------------------
1818 /**
1819 * Handles any lazy-creation helper properties which are added to make
1820 * things easier to configure inside environments such as Spring
1821 *
1822 * @throws Exception
1823 */
1824 protected void processHelperProperties() throws Exception {
1825 boolean masterServiceExists = false;
1826 if (transportConnectorURIs != null) {
1827 for (int i = 0; i < transportConnectorURIs.length; i++) {
1828 String uri = transportConnectorURIs[i];
1829 addConnector(uri);
1830 }
1831 }
1832 if (networkConnectorURIs != null) {
1833 for (int i = 0; i < networkConnectorURIs.length; i++) {
1834 String uri = networkConnectorURIs[i];
1835 addNetworkConnector(uri);
1836 }
1837 }
1838 if (jmsBridgeConnectors != null) {
1839 for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1840 addJmsConnector(jmsBridgeConnectors[i]);
1841 }
1842 }
1843 for (Service service : services) {
1844 if (service instanceof MasterConnector) {
1845 masterServiceExists = true;
1846 break;
1847 }
1848 }
1849 if (masterConnectorURI != null) {
1850 if (masterServiceExists) {
1851 throw new IllegalStateException(
1852 "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
1853 } else {
1854 addService(new MasterConnector(masterConnectorURI));
1855 }
1856 }
1857 }
1858
1859 protected void checkSystemUsageLimits() throws IOException {
1860 SystemUsage usage = getSystemUsage();
1861 long memLimit = usage.getMemoryUsage().getLimit();
1862 long jvmLimit = Runtime.getRuntime().maxMemory();
1863
1864 if (memLimit > jvmLimit) {
1865 LOG.error("Memory Usage for the Broker (" + memLimit / (1024 * 1024) +
1866 " mb) is more than the maximum available for the JVM: " +
1867 jvmLimit / (1024 * 1024) + " mb");
1868 }
1869
1870 if (getPersistenceAdapter() != null) {
1871 PersistenceAdapter adapter = getPersistenceAdapter();
1872 File dir = adapter.getDirectory();
1873
1874 if (dir != null) {
1875 String dirPath = dir.getAbsolutePath();
1876 if (!dir.isAbsolute()) {
1877 dir = new File(dirPath);
1878 }
1879
1880 while (dir != null && dir.isDirectory() == false) {
1881 dir = dir.getParentFile();
1882 }
1883 long storeLimit = usage.getStoreUsage().getLimit();
1884 long dirFreeSpace = dir.getUsableSpace();
1885 if (storeLimit > dirFreeSpace) {
1886 LOG.warn("Store limit is " + storeLimit / (1024 * 1024) +
1887 " mb, whilst the data directory: " + dir.getAbsolutePath() +
1888 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
1889 }
1890 }
1891
1892 long maxJournalFileSize = 0;
1893 long storeLimit = usage.getStoreUsage().getLimit();
1894
1895 if (adapter instanceof KahaDBPersistenceAdapter) {
1896 KahaDBPersistenceAdapter kahaDB = (KahaDBPersistenceAdapter) adapter;
1897 maxJournalFileSize = kahaDB.getJournalMaxFileLength();
1898 } else if (adapter instanceof AMQPersistenceAdapter) {
1899 AMQPersistenceAdapter amqAdapter = (AMQPersistenceAdapter) adapter;
1900 maxJournalFileSize = amqAdapter.getMaxFileLength();
1901 }
1902
1903 if (storeLimit < maxJournalFileSize) {
1904 LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
1905 " mb, whilst the max journal file size for the store is: " +
1906 maxJournalFileSize / (1024 * 1024) + " mb, " +
1907 "the store will not accept any data when used.");
1908 }
1909 }
1910
1911 File tmpDir = getTmpDataDirectory();
1912 if (tmpDir != null) {
1913
1914 String tmpDirPath = tmpDir.getAbsolutePath();
1915 if (!tmpDir.isAbsolute()) {
1916 tmpDir = new File(tmpDirPath);
1917 }
1918
1919 long storeLimit = usage.getTempUsage().getLimit();
1920 while (tmpDir != null && tmpDir.isDirectory() == false) {
1921 tmpDir = tmpDir.getParentFile();
1922 }
1923 long dirFreeSpace = tmpDir.getUsableSpace();
1924 if (storeLimit > dirFreeSpace) {
1925 LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
1926 " mb, whilst the temporary data directory: " + tmpDirPath +
1927 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
1928 }
1929
1930 if (isPersistent()) {
1931 long maxJournalFileSize;
1932
1933 if (usage.getTempUsage().getStore() != null) {
1934 maxJournalFileSize = usage.getTempUsage().getStore().getJournalMaxFileLength();
1935 } else {
1936 maxJournalFileSize = org.apache.kahadb.journal.Journal.DEFAULT_MAX_FILE_LENGTH;
1937 }
1938
1939 if (storeLimit < maxJournalFileSize) {
1940 LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
1941 " mb, whilst the max journal file size for the temporary store is: " +
1942 maxJournalFileSize / (1024 * 1024) + " mb, " +
1943 "the temp store will not accept any data when used.");
1944 }
1945 }
1946 }
1947 }
1948
1949 public void stopAllConnectors(ServiceStopper stopper) {
1950 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1951 NetworkConnector connector = iter.next();
1952 unregisterNetworkConnectorMBean(connector);
1953 stopper.stop(connector);
1954 }
1955 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1956 ProxyConnector connector = iter.next();
1957 stopper.stop(connector);
1958 }
1959 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1960 JmsConnector connector = iter.next();
1961 stopper.stop(connector);
1962 }
1963 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1964 TransportConnector connector = iter.next();
1965 stopper.stop(connector);
1966 }
1967 }
1968
1969 protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
1970 try {
1971 ObjectName objectName = createConnectorObjectName(connector);
1972 connector = connector.asManagedConnector(getManagementContext(), objectName);
1973 ConnectorViewMBean view = new ConnectorView(connector);
1974 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1975 return connector;
1976 } catch (Throwable e) {
1977 throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1978 }
1979 }
1980
1981 protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
1982 if (isUseJmx()) {
1983 try {
1984 ObjectName objectName = createConnectorObjectName(connector);
1985 getManagementContext().unregisterMBean(objectName);
1986 } catch (Throwable e) {
1987 throw IOExceptionSupport.create(
1988 "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
1989 }
1990 }
1991 }
1992
1993 protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1994 return adaptor;
1995 }
1996
1997 protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1998 if (isUseJmx()) {
1999 }
2000 }
2001
2002 private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
2003 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2004 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName="
2005 + JMXSupport.encodeObjectNamePart(connector.getName()));
2006 }
2007
2008 protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
2009 NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
2010 try {
2011 ObjectName objectName = createNetworkConnectorObjectName(connector);
2012 connector.setObjectName(objectName);
2013 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2014 } catch (Throwable e) {
2015 throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
2016 }
2017 }
2018
2019 protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector)
2020 throws MalformedObjectNameException {
2021 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2022 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
2023 + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
2024 }
2025
2026
2027 public ObjectName createDuplexNetworkConnectorObjectName(String transport)
2028 throws MalformedObjectNameException {
2029 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2030 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
2031 + "NetworkConnectorName=duplex" + JMXSupport.encodeObjectNamePart(transport));
2032 }
2033
2034 protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
2035 if (isUseJmx()) {
2036 try {
2037 ObjectName objectName = createNetworkConnectorObjectName(connector);
2038 getManagementContext().unregisterMBean(objectName);
2039 } catch (Exception e) {
2040 LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
2041 }
2042 }
2043 }
2044
2045 protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
2046 ProxyConnectorView view = new ProxyConnectorView(connector);
2047 try {
2048 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2049 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector,"
2050 + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
2051 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2052 } catch (Throwable e) {
2053 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2054 }
2055 }
2056
2057 protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
2058 FTConnectorView view = new FTConnectorView(connector);
2059 try {
2060 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2061 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
2062 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2063 } catch (Throwable e) {
2064 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2065 }
2066 }
2067
2068 protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
2069 JmsConnectorView view = new JmsConnectorView(connector);
2070 try {
2071 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2072 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector,"
2073 + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
2074 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2075 } catch (Throwable e) {
2076 throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2077 }
2078 }
2079
2080 /**
2081 * Factory method to create a new broker
2082 *
2083 * @throws Exception
2084 * @throws
2085 * @throws
2086 */
2087 protected Broker createBroker() throws Exception {
2088 regionBroker = createRegionBroker();
2089 Broker broker = addInterceptors(regionBroker);
2090 // Add a filter that will stop access to the broker once stopped
2091 broker = new MutableBrokerFilter(broker) {
2092 Broker old;
2093
2094 @Override
2095 public void stop() throws Exception {
2096 old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
2097 // Just ignore additional stop actions.
2098 @Override
2099 public void stop() throws Exception {
2100 }
2101 });
2102 old.stop();
2103 }
2104
2105 @Override
2106 public void start() throws Exception {
2107 if (forceStart && old != null) {
2108 this.next.set(old);
2109 }
2110 getNext().start();
2111 }
2112 };
2113 return broker;
2114 }
2115
2116 /**
2117 * Factory method to create the core region broker onto which interceptors
2118 * are added
2119 *
2120 * @throws Exception
2121 */
2122 protected Broker createRegionBroker() throws Exception {
2123 if (destinationInterceptors == null) {
2124 destinationInterceptors = createDefaultDestinationInterceptor();
2125 }
2126 configureServices(destinationInterceptors);
2127 DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
2128 if (destinationFactory == null) {
2129 destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
2130 }
2131 return createRegionBroker(destinationInterceptor);
2132 }
2133
2134 protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
2135 RegionBroker regionBroker;
2136 if (isUseJmx()) {
2137 regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
2138 getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
2139 } else {
2140 regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
2141 destinationInterceptor,getScheduler(),getExecutor());
2142 }
2143 destinationFactory.setRegionBroker(regionBroker);
2144 regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
2145 regionBroker.setBrokerName(getBrokerName());
2146 regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
2147 regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend());
2148 if (brokerId != null) {
2149 regionBroker.setBrokerId(brokerId);
2150 }
2151 return regionBroker;
2152 }
2153
2154 /**
2155 * Create the default destination interceptor
2156 */
2157 protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
2158 List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
2159 if (isUseVirtualTopics()) {
2160 VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
2161 VirtualTopic virtualTopic = new VirtualTopic();
2162 virtualTopic.setName("VirtualTopic.>");
2163 VirtualDestination[] virtualDestinations = { virtualTopic };
2164 interceptor.setVirtualDestinations(virtualDestinations);
2165 answer.add(interceptor);
2166 }
2167 if (isUseMirroredQueues()) {
2168 MirroredQueue interceptor = new MirroredQueue();
2169 answer.add(interceptor);
2170 }
2171 DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
2172 answer.toArray(array);
2173 return array;
2174 }
2175
2176 /**
2177 * Strategy method to add interceptors to the broker
2178 *
2179 * @throws IOException
2180 */
2181 protected Broker addInterceptors(Broker broker) throws Exception {
2182 if (isSchedulerSupport()) {
2183 SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile());
2184 if (isUseJmx()) {
2185 JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
2186 try {
2187 ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":"
2188 + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
2189 + "Type=jobScheduler," + "jobSchedulerName=JMS");
2190
2191 AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2192 this.adminView.setJMSJobScheduler(objectName);
2193 } catch (Throwable e) {
2194 throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
2195 + e.getMessage(), e);
2196 }
2197
2198 }
2199 broker = sb;
2200 }
2201 if (isAdvisorySupport()) {
2202 broker = new AdvisoryBroker(broker);
2203 }
2204 broker = new CompositeDestinationBroker(broker);
2205 broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
2206 if (isPopulateJMSXUserID()) {
2207 UserIDBroker userIDBroker = new UserIDBroker(broker);
2208 userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
2209 broker = userIDBroker;
2210 }
2211 if (isMonitorConnectionSplits()) {
2212 broker = new ConnectionSplitBroker(broker);
2213 }
2214 if (plugins != null) {
2215 for (int i = 0; i < plugins.length; i++) {
2216 BrokerPlugin plugin = plugins[i];
2217 broker = plugin.installPlugin(broker);
2218 }
2219 }
2220 return broker;
2221 }
2222
2223 protected PersistenceAdapter createPersistenceAdapter() throws IOException {
2224 if (isPersistent()) {
2225 PersistenceAdapterFactory fac = getPersistenceFactory();
2226 if (fac != null) {
2227 return fac.createPersistenceAdapter();
2228 }else {
2229 KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
2230 File dir = new File(getBrokerDataDirectory(),"KahaDB");
2231 adaptor.setDirectory(dir);
2232 return adaptor;
2233 }
2234 } else {
2235 return new MemoryPersistenceAdapter();
2236 }
2237 }
2238
2239 protected ObjectName createBrokerObjectName() throws IOException {
2240 try {
2241 return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2242 + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
2243 } catch (Throwable e) {
2244 throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);
2245 }
2246 }
2247
2248 protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
2249 TransportServer transport = TransportFactory.bind(this, brokerURI);
2250 return new TransportConnector(transport);
2251 }
2252
2253 /**
2254 * Extracts the port from the options
2255 */
2256 protected Object getPort(Map options) {
2257 Object port = options.get("port");
2258 if (port == null) {
2259 port = DEFAULT_PORT;
2260 LOG.warn("No port specified so defaulting to: " + port);
2261 }
2262 return port;
2263 }
2264
2265 protected void addShutdownHook() {
2266 if (useShutdownHook) {
2267 shutdownHook = new Thread("ActiveMQ ShutdownHook") {
2268 @Override
2269 public void run() {
2270 containerShutdown();
2271 }
2272 };
2273 Runtime.getRuntime().addShutdownHook(shutdownHook);
2274 }
2275 }
2276
2277 protected void removeShutdownHook() {
2278 if (shutdownHook != null) {
2279 try {
2280 Runtime.getRuntime().removeShutdownHook(shutdownHook);
2281 } catch (Exception e) {
2282 LOG.debug("Caught exception, must be shutting down: " + e);
2283 }
2284 }
2285 }
2286
2287 /**
2288 * Sets hooks to be executed when broker shut down
2289 *
2290 * @org.apache.xbean.Property
2291 */
2292 public void setShutdownHooks(List<Runnable> hooks) throws Exception {
2293 for (Runnable hook : hooks) {
2294 addShutdownHook(hook);
2295 }
2296 }
2297
2298 /**
2299 * Causes a clean shutdown of the container when the VM is being shut down
2300 */
2301 protected void containerShutdown() {
2302 try {
2303 stop();
2304 } catch (IOException e) {
2305 Throwable linkedException = e.getCause();
2306 if (linkedException != null) {
2307 logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2308 } else {
2309 logError("Failed to shut down: " + e, e);
2310 }
2311 if (!useLoggingForShutdownErrors) {
2312 e.printStackTrace(System.err);
2313 }
2314 } catch (Exception e) {
2315 logError("Failed to shut down: " + e, e);
2316 }
2317 }
2318
2319 protected void logError(String message, Throwable e) {
2320 if (useLoggingForShutdownErrors) {
2321 LOG.error("Failed to shut down: " + e);
2322 } else {
2323 System.err.println("Failed to shut down: " + e);
2324 }
2325 }
2326
2327 /**
2328 * Starts any configured destinations on startup
2329 */
2330 protected void startDestinations() throws Exception {
2331 if (destinations != null) {
2332 ConnectionContext adminConnectionContext = getAdminConnectionContext();
2333 for (int i = 0; i < destinations.length; i++) {
2334 ActiveMQDestination destination = destinations[i];
2335 getBroker().addDestination(adminConnectionContext, destination,true);
2336 }
2337 }
2338 if (isUseVirtualTopics()) {
2339 startVirtualConsumerDestinations();
2340 }
2341 }
2342
2343 /**
2344 * Returns the broker's administration connection context used for
2345 * configuring the broker at startup
2346 */
2347 public ConnectionContext getAdminConnectionContext() throws Exception {
2348 return BrokerSupport.getConnectionContext(getBroker());
2349 }
2350
2351 protected void waitForSlave() {
2352 try {
2353 if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
2354 throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds.");
2355 }
2356 } catch (InterruptedException e) {
2357 LOG.error("Exception waiting for slave:" + e);
2358 }
2359 }
2360
2361 protected void slaveConnectionEstablished() {
2362 slaveStartSignal.countDown();
2363 }
2364
2365 protected void startManagementContext() throws Exception {
2366 getManagementContext().setBrokerName(brokerName);
2367 getManagementContext().start();
2368 adminView = new BrokerView(this, null);
2369 ObjectName objectName = getBrokerObjectName();
2370 AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2371 }
2372
2373 /**
2374 * Start all transport and network connections, proxies and bridges
2375 *
2376 * @throws Exception
2377 */
2378 public void startAllConnectors() throws Exception {
2379 if (!isSlave()) {
2380 Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2381 List<TransportConnector> al = new ArrayList<TransportConnector>();
2382 for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2383 TransportConnector connector = iter.next();
2384 connector.setBrokerService(this);
2385 al.add(startTransportConnector(connector));
2386 }
2387 if (al.size() > 0) {
2388 // let's clear the transportConnectors list and replace it with
2389 // the started transportConnector instances
2390 this.transportConnectors.clear();
2391 setTransportConnectors(al);
2392 }
2393 URI uri = getVmConnectorURI();
2394 Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
2395 map.put("network", "true");
2396 map.put("async", "false");
2397 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
2398 if (isWaitForSlave()) {
2399 waitForSlave();
2400 }
2401 if (!stopped.get()) {
2402 ThreadPoolExecutor networkConnectorStartExecutor = null;
2403 if (isNetworkConnectorStartAsync()) {
2404 // spin up as many threads as needed
2405 networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2406 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2407 new ThreadFactory() {
2408 int count=0;
2409 public Thread newThread(Runnable runnable) {
2410 Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2411 thread.setDaemon(true);
2412 return thread;
2413 }
2414 });
2415 }
2416
2417 for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2418 final NetworkConnector connector = iter.next();
2419 connector.setLocalUri(uri);
2420 connector.setBrokerName(getBrokerName());
2421 connector.setDurableDestinations(durableDestinations);
2422 if (getDefaultSocketURIString() != null) {
2423 connector.setBrokerURL(getDefaultSocketURIString());
2424 }
2425 if (networkConnectorStartExecutor != null) {
2426 networkConnectorStartExecutor.execute(new Runnable() {
2427 public void run() {
2428 try {
2429 LOG.info("Async start of " + connector);
2430 connector.start();
2431 } catch(Exception e) {
2432 LOG.error("Async start of network connector: " + connector + " failed", e);
2433 }
2434 }
2435 });
2436 } else {
2437 connector.start();
2438 }
2439 }
2440 if (networkConnectorStartExecutor != null) {
2441 // executor done when enqueued tasks are complete
2442 ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
2443 }
2444
2445 for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2446 ProxyConnector connector = iter.next();
2447 connector.start();
2448 }
2449 for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2450 JmsConnector connector = iter.next();
2451 connector.start();
2452 }
2453 for (Service service : services) {
2454 configureService(service);
2455 service.start();
2456 }
2457 }
2458 }
2459 }
2460
2461 protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2462 connector.setTaskRunnerFactory(getTaskRunnerFactory());
2463 MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2464 if (policy != null) {
2465 connector.setMessageAuthorizationPolicy(policy);
2466 }
2467 if (isUseJmx()) {
2468 connector = registerConnectorMBean(connector);
2469 }
2470 connector.getStatistics().setEnabled(enableStatistics);
2471 connector.start();
2472 return connector;
2473 }
2474
2475 /**
2476 * Perform any custom dependency injection
2477 */
2478 protected void configureServices(Object[] services) {
2479 for (Object service : services) {
2480 configureService(service);
2481 }
2482 }
2483
2484 /**
2485 * Perform any custom dependency injection
2486 */
2487 protected void configureService(Object service) {
2488 if (service instanceof BrokerServiceAware) {
2489 BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2490 serviceAware.setBrokerService(this);
2491 }
2492 if (masterConnector == null) {
2493 if (service instanceof MasterConnector) {
2494 masterConnector = (MasterConnector) service;
2495 supportFailOver = true;
2496 }
2497 }
2498 }
2499
2500 public void handleIOException(IOException exception) {
2501 if (ioExceptionHandler != null) {
2502 ioExceptionHandler.handle(exception);
2503 } else {
2504 LOG.info("No IOExceptionHandler registered, ignoring IO exception, " + exception, exception);
2505 }
2506 }
2507
2508 protected void startVirtualConsumerDestinations() throws Exception {
2509 ConnectionContext adminConnectionContext = getAdminConnectionContext();
2510 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
2511 DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
2512 if (!destinations.isEmpty()) {
2513 for (ActiveMQDestination destination : destinations) {
2514 if (filter.matches(destination) == true) {
2515 broker.addDestination(adminConnectionContext, destination, false);
2516 }
2517 }
2518 }
2519 }
2520
2521 private DestinationFilter getVirtualTopicConsumerDestinationFilter() {
2522 // created at startup, so no sync needed
2523 if (virtualConsumerDestinationFilter == null) {
2524 Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>();
2525 if (destinationInterceptors != null) {
2526 for (DestinationInterceptor interceptor : destinationInterceptors) {
2527 if (interceptor instanceof VirtualDestinationInterceptor) {
2528 VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor;
2529 for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) {
2530 if (virtualDestination instanceof VirtualTopic) {
2531 consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
2532 }
2533 }
2534 }
2535 }
2536 }
2537 ActiveMQQueue filter = new ActiveMQQueue();
2538 filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{}));
2539 virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter);
2540 }
2541 return virtualConsumerDestinationFilter;
2542 }
2543
2544 protected synchronized ThreadPoolExecutor getExecutor() {
2545 if (this.executor == null) {
2546 this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2547
2548 private long i = 0;
2549
2550 @Override
2551 public Thread newThread(Runnable runnable) {
2552 this.i++;
2553 Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i);
2554 thread.setDaemon(true);
2555 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
2556 @Override
2557 public void uncaughtException(final Thread t, final Throwable e) {
2558 LOG.error("Error in thread '{}'", t.getName(), e);
2559 }
2560 });
2561 return thread;
2562 }
2563 }, new RejectedExecutionHandler() {
2564 @Override
2565 public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
2566 try {
2567 executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
2568 } catch (InterruptedException e) {
2569 throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
2570 }
2571
2572 throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
2573 }
2574 });
2575 }
2576 return this.executor;
2577 }
2578
2579 public synchronized Scheduler getScheduler() {
2580 if (this.scheduler==null) {
2581 this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2582 try {
2583 this.scheduler.start();
2584 } catch (Exception e) {
2585 LOG.error("Failed to start Scheduler ",e);
2586 }
2587 }
2588 return this.scheduler;
2589 }
2590
2591 public Broker getRegionBroker() {
2592 return regionBroker;
2593 }
2594
2595 public void setRegionBroker(Broker regionBroker) {
2596 this.regionBroker = regionBroker;
2597 }
2598
2599 public void addShutdownHook(Runnable hook) {
2600 synchronized (shutdownHooks) {
2601 shutdownHooks.add(hook);
2602 }
2603 }
2604
2605 public void removeShutdownHook(Runnable hook) {
2606 synchronized (shutdownHooks) {
2607 shutdownHooks.remove(hook);
2608 }
2609 }
2610
2611 public boolean isSystemExitOnShutdown() {
2612 return systemExitOnShutdown;
2613 }
2614
2615 /**
2616 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2617 */
2618 public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2619 this.systemExitOnShutdown = systemExitOnShutdown;
2620 }
2621
2622 public int getSystemExitOnShutdownExitCode() {
2623 return systemExitOnShutdownExitCode;
2624 }
2625
2626 public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2627 this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2628 }
2629
2630 public SslContext getSslContext() {
2631 return sslContext;
2632 }
2633
2634 public void setSslContext(SslContext sslContext) {
2635 this.sslContext = sslContext;
2636 }
2637
2638 public boolean isShutdownOnSlaveFailure() {
2639 return shutdownOnSlaveFailure;
2640 }
2641
2642 /**
2643 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2644 */
2645 public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2646 this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2647 }
2648
2649 public boolean isWaitForSlave() {
2650 return waitForSlave;
2651 }
2652
2653 /**
2654 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2655 */
2656 public void setWaitForSlave(boolean waitForSlave) {
2657 this.waitForSlave = waitForSlave;
2658 }
2659
2660 public long getWaitForSlaveTimeout() {
2661 return this.waitForSlaveTimeout;
2662 }
2663
2664 public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2665 this.waitForSlaveTimeout = waitForSlaveTimeout;
2666 }
2667
2668 public CountDownLatch getSlaveStartSignal() {
2669 return slaveStartSignal;
2670 }
2671
2672 /**
2673 * Get the passiveSlave
2674 * @return the passiveSlave
2675 */
2676 public boolean isPassiveSlave() {
2677 return this.passiveSlave;
2678 }
2679
2680 /**
2681 * Set the passiveSlave
2682 * @param passiveSlave the passiveSlave to set
2683 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2684 */
2685 public void setPassiveSlave(boolean passiveSlave) {
2686 this.passiveSlave = passiveSlave;
2687 }
2688
2689 /**
2690 * override the Default IOException handler, called when persistence adapter
2691 * has experiences File or JDBC I/O Exceptions
2692 *
2693 * @param ioExceptionHandler
2694 */
2695 public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2696 configureService(ioExceptionHandler);
2697 this.ioExceptionHandler = ioExceptionHandler;
2698 }
2699
2700 public IOExceptionHandler getIoExceptionHandler() {
2701 return ioExceptionHandler;
2702 }
2703
2704 /**
2705 * @return the schedulerSupport
2706 */
2707 public boolean isSchedulerSupport() {
2708 return this.schedulerSupport;
2709 }
2710
2711 /**
2712 * @param schedulerSupport the schedulerSupport to set
2713 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2714 */
2715 public void setSchedulerSupport(boolean schedulerSupport) {
2716 this.schedulerSupport = schedulerSupport;
2717 }
2718
2719 /**
2720 * @return the schedulerDirectory
2721 */
2722 public File getSchedulerDirectoryFile() {
2723 if (this.schedulerDirectoryFile == null) {
2724 this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2725 }
2726 return schedulerDirectoryFile;
2727 }
2728
2729 /**
2730 * @param schedulerDirectory the schedulerDirectory to set
2731 */
2732 public void setSchedulerDirectoryFile(File schedulerDirectory) {
2733 this.schedulerDirectoryFile = schedulerDirectory;
2734 }
2735
2736 public void setSchedulerDirectory(String schedulerDirectory) {
2737 setSchedulerDirectoryFile(new File(schedulerDirectory));
2738 }
2739
2740 public int getSchedulePeriodForDestinationPurge() {
2741 return this.schedulePeriodForDestinationPurge;
2742 }
2743
2744 public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2745 this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2746 }
2747
2748 public int getMaxPurgedDestinationsPerSweep() {
2749 return this.maxPurgedDestinationsPerSweep;
2750 }
2751
2752 public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) {
2753 this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep;
2754 }
2755
2756 public BrokerContext getBrokerContext() {
2757 return brokerContext;
2758 }
2759
2760 public void setBrokerContext(BrokerContext brokerContext) {
2761 this.brokerContext = brokerContext;
2762 }
2763
2764 public void setBrokerId(String brokerId) {
2765 this.brokerId = new BrokerId(brokerId);
2766 }
2767
2768 public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
2769 return useAuthenticatedPrincipalForJMSXUserID;
2770 }
2771
2772 public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
2773 this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
2774 }
2775
2776 /**
2777 * Should MBeans that support showing the Authenticated User Name information have this
2778 * value filled in or not.
2779 *
2780 * @return true if user names should be exposed in MBeans
2781 */
2782 public boolean isPopulateUserNameInMBeans() {
2783 return this.populateUserNameInMBeans;
2784 }
2785
2786 /**
2787 * Sets whether Authenticated User Name information is shown in MBeans that support this field.
2788 * @param value if MBeans should expose user name information.
2789 */
2790 public void setPopulateUserNameInMBeans(boolean value) {
2791 this.populateUserNameInMBeans = value;
2792 }
2793
2794 /**
2795 * Gets the time in Milliseconds that an invocation of an MBean method will wait before
2796 * failing. The default value is to wait forever (zero).
2797 *
2798 * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
2799 */
2800 public long getMbeanInvocationTimeout() {
2801 return mbeanInvocationTimeout;
2802 }
2803
2804 /**
2805 * Gets the time in Milliseconds that an invocation of an MBean method will wait before
2806 * failing. The default value is to wait forever (zero).
2807 *
2808 * @param mbeanInvocationTimeout
2809 * timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
2810 */
2811 public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) {
2812 this.mbeanInvocationTimeout = mbeanInvocationTimeout;
2813 }
2814
2815 public boolean isNetworkConnectorStartAsync() {
2816 return networkConnectorStartAsync;
2817 }
2818
2819 public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
2820 this.networkConnectorStartAsync = networkConnectorStartAsync;
2821 }
2822
2823 public boolean isAllowTempAutoCreationOnSend() {
2824 return allowTempAutoCreationOnSend;
2825 }
2826
2827 /**
2828 * enable if temp destinations need to be propagated through a network when
2829 * advisorySupport==false. This is used in conjunction with the policy
2830 * gcInactiveDestinations for matching temps so they can get removed
2831 * when inactive
2832 *
2833 * @param allowTempAutoCreationOnSend
2834 */
2835 public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
2836 this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
2837 }
2838
2839 public int getOfflineDurableSubscriberTimeout() {
2840 return offlineDurableSubscriberTimeout;
2841 }
2842
2843 public void setOfflineDurableSubscriberTimeout(int offlineDurableSubscriberTimeout) {
2844 this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
2845 }
2846
2847 public int getOfflineDurableSubscriberTaskSchedule() {
2848 return offlineDurableSubscriberTaskSchedule;
2849 }
2850
2851 public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule) {
2852 this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
2853 }
2854
2855 public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
2856 return isUseVirtualTopics() && destination.isQueue() &&
2857 getVirtualTopicConsumerDestinationFilter().matches(destination);
2858 }
2859
2860 public Throwable getStartException() {
2861 return startException;
2862 }
2863
2864 public boolean isStartAsync() {
2865 return startAsync;
2866 }
2867
2868 public void setStartAsync(boolean startAsync) {
2869 this.startAsync = startAsync;
2870 }
2871 }