001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import java.io.BufferedReader;
020import java.io.File;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.InputStreamReader;
024import java.net.URI;
025import java.net.URISyntaxException;
026import java.net.UnknownHostException;
027import java.security.Provider;
028import java.security.Security;
029import java.util.ArrayList;
030import java.util.Date;
031import java.util.HashMap;
032import java.util.HashSet;
033import java.util.Iterator;
034import java.util.List;
035import java.util.Locale;
036import java.util.Map;
037import java.util.Set;
038import java.util.concurrent.CopyOnWriteArrayList;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.LinkedBlockingQueue;
041import java.util.concurrent.RejectedExecutionException;
042import java.util.concurrent.RejectedExecutionHandler;
043import java.util.concurrent.SynchronousQueue;
044import java.util.concurrent.ThreadFactory;
045import java.util.concurrent.ThreadPoolExecutor;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.atomic.AtomicBoolean;
048import java.util.concurrent.atomic.AtomicInteger;
049import java.util.concurrent.atomic.AtomicLong;
050
051import javax.annotation.PostConstruct;
052import javax.annotation.PreDestroy;
053import javax.management.MalformedObjectNameException;
054import javax.management.ObjectName;
055
056import org.apache.activemq.ActiveMQConnectionMetaData;
057import org.apache.activemq.ConfigurationException;
058import org.apache.activemq.Service;
059import org.apache.activemq.advisory.AdvisoryBroker;
060import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
061import org.apache.activemq.broker.jmx.AnnotatedMBean;
062import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
063import org.apache.activemq.broker.jmx.BrokerView;
064import org.apache.activemq.broker.jmx.ConnectorView;
065import org.apache.activemq.broker.jmx.ConnectorViewMBean;
066import org.apache.activemq.broker.jmx.HealthView;
067import org.apache.activemq.broker.jmx.HealthViewMBean;
068import org.apache.activemq.broker.jmx.JmsConnectorView;
069import org.apache.activemq.broker.jmx.JobSchedulerView;
070import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
071import org.apache.activemq.broker.jmx.Log4JConfigView;
072import org.apache.activemq.broker.jmx.ManagedRegionBroker;
073import org.apache.activemq.broker.jmx.ManagementContext;
074import org.apache.activemq.broker.jmx.NetworkConnectorView;
075import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
076import org.apache.activemq.broker.jmx.ProxyConnectorView;
077import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
078import org.apache.activemq.broker.region.Destination;
079import org.apache.activemq.broker.region.DestinationFactory;
080import org.apache.activemq.broker.region.DestinationFactoryImpl;
081import org.apache.activemq.broker.region.DestinationInterceptor;
082import org.apache.activemq.broker.region.RegionBroker;
083import org.apache.activemq.broker.region.policy.PolicyMap;
084import org.apache.activemq.broker.region.virtual.MirroredQueue;
085import org.apache.activemq.broker.region.virtual.VirtualDestination;
086import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
087import org.apache.activemq.broker.region.virtual.VirtualTopic;
088import org.apache.activemq.broker.scheduler.JobSchedulerStore;
089import org.apache.activemq.broker.scheduler.SchedulerBroker;
090import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
091import org.apache.activemq.command.ActiveMQDestination;
092import org.apache.activemq.command.ActiveMQQueue;
093import org.apache.activemq.command.BrokerId;
094import org.apache.activemq.command.ProducerInfo;
095import org.apache.activemq.filter.DestinationFilter;
096import org.apache.activemq.network.ConnectionFilter;
097import org.apache.activemq.network.DiscoveryNetworkConnector;
098import org.apache.activemq.network.NetworkConnector;
099import org.apache.activemq.network.jms.JmsConnector;
100import org.apache.activemq.openwire.OpenWireFormat;
101import org.apache.activemq.proxy.ProxyConnector;
102import org.apache.activemq.security.MessageAuthorizationPolicy;
103import org.apache.activemq.selector.SelectorParser;
104import org.apache.activemq.store.JournaledStore;
105import org.apache.activemq.store.PListStore;
106import org.apache.activemq.store.PersistenceAdapter;
107import org.apache.activemq.store.PersistenceAdapterFactory;
108import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
109import org.apache.activemq.thread.Scheduler;
110import org.apache.activemq.thread.TaskRunnerFactory;
111import org.apache.activemq.transport.TransportFactorySupport;
112import org.apache.activemq.transport.TransportServer;
113import org.apache.activemq.transport.vm.VMTransportFactory;
114import org.apache.activemq.usage.StoreUsage;
115import org.apache.activemq.usage.SystemUsage;
116import org.apache.activemq.usage.Usage;
117import org.apache.activemq.util.BrokerSupport;
118import org.apache.activemq.util.DefaultIOExceptionHandler;
119import org.apache.activemq.util.IOExceptionHandler;
120import org.apache.activemq.util.IOExceptionSupport;
121import org.apache.activemq.util.IOHelper;
122import org.apache.activemq.util.InetAddressUtil;
123import org.apache.activemq.util.ServiceStopper;
124import org.apache.activemq.util.StoreUtil;
125import org.apache.activemq.util.ThreadPoolUtils;
126import org.apache.activemq.util.TimeUtils;
127import org.apache.activemq.util.URISupport;
128import org.slf4j.Logger;
129import org.slf4j.LoggerFactory;
130import org.slf4j.MDC;
131
132/**
133 * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a
134 * number of transport connectors, network connectors and a bunch of properties
135 * which can be used to configure the broker as its lazily created.
136 *
137 * @org.apache.xbean.XBean
138 */
139public class BrokerService implements Service {
140    public static final String DEFAULT_PORT = "61616";
141    public static final String LOCAL_HOST_NAME;
142    public static final String BROKER_VERSION;
143    public static final String DEFAULT_BROKER_NAME = "localhost";
144    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
145    public static final long DEFAULT_START_TIMEOUT = 600000L;
146
147    private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
148
149    @SuppressWarnings("unused")
150    private static final long serialVersionUID = 7353129142305630237L;
151
152    private boolean useJmx = true;
153    private boolean enableStatistics = true;
154    private boolean persistent = true;
155    private boolean populateJMSXUserID;
156    private boolean useAuthenticatedPrincipalForJMSXUserID;
157    private boolean populateUserNameInMBeans;
158    private long mbeanInvocationTimeout = 0;
159
160    private boolean useShutdownHook = true;
161    private boolean useLoggingForShutdownErrors;
162    private boolean shutdownOnMasterFailure;
163    private boolean shutdownOnSlaveFailure;
164    private boolean waitForSlave;
165    private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT;
166    private boolean passiveSlave;
167    private String brokerName = DEFAULT_BROKER_NAME;
168    private File dataDirectoryFile;
169    private File tmpDataDirectory;
170    private Broker broker;
171    private BrokerView adminView;
172    private ManagementContext managementContext;
173    private ObjectName brokerObjectName;
174    private TaskRunnerFactory taskRunnerFactory;
175    private TaskRunnerFactory persistenceTaskRunnerFactory;
176    private SystemUsage systemUsage;
177    private SystemUsage producerSystemUsage;
178    private SystemUsage consumerSystemUsaage;
179    private PersistenceAdapter persistenceAdapter;
180    private PersistenceAdapterFactory persistenceFactory;
181    protected DestinationFactory destinationFactory;
182    private MessageAuthorizationPolicy messageAuthorizationPolicy;
183    private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
184    private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
185    private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
186    private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
187    private final List<Service> services = new ArrayList<Service>();
188    private transient Thread shutdownHook;
189    private String[] transportConnectorURIs;
190    private String[] networkConnectorURIs;
191    private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
192    // to other jms messaging systems
193    private boolean deleteAllMessagesOnStartup;
194    private boolean advisorySupport = true;
195    private URI vmConnectorURI;
196    private String defaultSocketURIString;
197    private PolicyMap destinationPolicy;
198    private final AtomicBoolean started = new AtomicBoolean(false);
199    private final AtomicBoolean stopped = new AtomicBoolean(false);
200    private final AtomicBoolean stopping = new AtomicBoolean(false);
201    private BrokerPlugin[] plugins;
202    private boolean keepDurableSubsActive = true;
203    private boolean useVirtualTopics = true;
204    private boolean useMirroredQueues = false;
205    private boolean useTempMirroredQueues = true;
206    /**
207     * Whether or not virtual destination subscriptions should cause network demand
208     */
209    private boolean useVirtualDestSubs = false;
210    /**
211     * Whether or not the creation of destinations that match virtual destinations
212     * should cause network demand
213     */
214    private boolean useVirtualDestSubsOnCreation = false;
215    private BrokerId brokerId;
216    private volatile DestinationInterceptor[] destinationInterceptors;
217    private ActiveMQDestination[] destinations;
218    private PListStore tempDataStore;
219    private int persistenceThreadPriority = Thread.MAX_PRIORITY;
220    private boolean useLocalHostBrokerName;
221    private final CountDownLatch stoppedLatch = new CountDownLatch(1);
222    private final CountDownLatch startedLatch = new CountDownLatch(1);
223    private Broker regionBroker;
224    private int producerSystemUsagePortion = 60;
225    private int consumerSystemUsagePortion = 40;
226    private boolean splitSystemUsageForProducersConsumers;
227    private boolean monitorConnectionSplits = false;
228    private int taskRunnerPriority = Thread.NORM_PRIORITY;
229    private boolean dedicatedTaskRunner;
230    private boolean cacheTempDestinations = false;// useful for failover
231    private int timeBeforePurgeTempDestinations = 5000;
232    private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
233    private boolean systemExitOnShutdown;
234    private int systemExitOnShutdownExitCode;
235    private SslContext sslContext;
236    private boolean forceStart = false;
237    private IOExceptionHandler ioExceptionHandler;
238    private boolean schedulerSupport = false;
239    private File schedulerDirectoryFile;
240    private Scheduler scheduler;
241    private ThreadPoolExecutor executor;
242    private int schedulePeriodForDestinationPurge= 0;
243    private int maxPurgedDestinationsPerSweep = 0;
244    private int schedulePeriodForDiskUsageCheck = 0;
245    private int diskUsageCheckRegrowThreshold = -1;
246    private boolean adjustUsageLimits = true;
247    private BrokerContext brokerContext;
248    private boolean networkConnectorStartAsync = false;
249    private boolean allowTempAutoCreationOnSend;
250    private JobSchedulerStore jobSchedulerStore;
251    private final AtomicLong totalConnections = new AtomicLong();
252    private final AtomicInteger currentConnections = new AtomicInteger();
253
254    private long offlineDurableSubscriberTimeout = -1;
255    private long offlineDurableSubscriberTaskSchedule = 300000;
256    private DestinationFilter virtualConsumerDestinationFilter;
257
258    private final AtomicBoolean persistenceAdapterStarted = new AtomicBoolean(false);
259    private Throwable startException = null;
260    private boolean startAsync = false;
261    private Date startDate;
262    private boolean slave = true;
263
264    private boolean restartAllowed = true;
265    private boolean restartRequested = false;
266    private boolean rejectDurableConsumers = false;
267    private boolean rollbackOnlyOnAsyncException = true;
268
269    private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
270
271    static {
272
273        try {
274            ClassLoader loader = BrokerService.class.getClassLoader();
275            Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider");
276            Provider bouncycastle = (Provider) clazz.newInstance();
277            Security.insertProviderAt(bouncycastle,
278                Integer.getInteger("org.apache.activemq.broker.BouncyCastlePosition", 2));
279            LOG.info("Loaded the Bouncy Castle security provider.");
280        } catch(Throwable e) {
281            // No BouncyCastle found so we use the default Java Security Provider
282        }
283
284        String localHostName = "localhost";
285        try {
286            localHostName =  InetAddressUtil.getLocalHostName();
287        } catch (UnknownHostException e) {
288            LOG.error("Failed to resolve localhost");
289        }
290        LOCAL_HOST_NAME = localHostName;
291
292        String version = null;
293        try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) {
294            if (in != null) {
295                try(InputStreamReader isr = new InputStreamReader(in);
296                    BufferedReader reader = new BufferedReader(isr)) {
297                    version = reader.readLine();
298                }
299            }
300        } catch (IOException ie) {
301            LOG.warn("Error reading broker version ", ie);
302        }
303        BROKER_VERSION = version;
304    }
305
306    @Override
307    public String toString() {
308        return "BrokerService[" + getBrokerName() + "]";
309    }
310
311    private String getBrokerVersion() {
312        String version = ActiveMQConnectionMetaData.PROVIDER_VERSION;
313        if (version == null) {
314            version = BROKER_VERSION;
315        }
316
317        return version;
318    }
319
320    /**
321     * Adds a new transport connector for the given bind address
322     *
323     * @return the newly created and added transport connector
324     * @throws Exception
325     */
326    public TransportConnector addConnector(String bindAddress) throws Exception {
327        return addConnector(new URI(bindAddress));
328    }
329
330    /**
331     * Adds a new transport connector for the given bind address
332     *
333     * @return the newly created and added transport connector
334     * @throws Exception
335     */
336    public TransportConnector addConnector(URI bindAddress) throws Exception {
337        return addConnector(createTransportConnector(bindAddress));
338    }
339
340    /**
341     * Adds a new transport connector for the given TransportServer transport
342     *
343     * @return the newly created and added transport connector
344     * @throws Exception
345     */
346    public TransportConnector addConnector(TransportServer transport) throws Exception {
347        return addConnector(new TransportConnector(transport));
348    }
349
350    /**
351     * Adds a new transport connector
352     *
353     * @return the transport connector
354     * @throws Exception
355     */
356    public TransportConnector addConnector(TransportConnector connector) throws Exception {
357        transportConnectors.add(connector);
358        return connector;
359    }
360
361    /**
362     * Stops and removes a transport connector from the broker.
363     *
364     * @param connector
365     * @return true if the connector has been previously added to the broker
366     * @throws Exception
367     */
368    public boolean removeConnector(TransportConnector connector) throws Exception {
369        boolean rc = transportConnectors.remove(connector);
370        if (rc) {
371            unregisterConnectorMBean(connector);
372        }
373        return rc;
374    }
375
376    /**
377     * Adds a new network connector using the given discovery address
378     *
379     * @return the newly created and added network connector
380     * @throws Exception
381     */
382    public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
383        return addNetworkConnector(new URI(discoveryAddress));
384    }
385
386    /**
387     * Adds a new proxy connector using the given bind address
388     *
389     * @return the newly created and added network connector
390     * @throws Exception
391     */
392    public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
393        return addProxyConnector(new URI(bindAddress));
394    }
395
396    /**
397     * Adds a new network connector using the given discovery address
398     *
399     * @return the newly created and added network connector
400     * @throws Exception
401     */
402    public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
403        NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
404        return addNetworkConnector(connector);
405    }
406
407    /**
408     * Adds a new proxy connector using the given bind address
409     *
410     * @return the newly created and added network connector
411     * @throws Exception
412     */
413    public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
414        ProxyConnector connector = new ProxyConnector();
415        connector.setBind(bindAddress);
416        connector.setRemote(new URI("fanout:multicast://default"));
417        return addProxyConnector(connector);
418    }
419
420    /**
421     * Adds a new network connector to connect this broker to a federated
422     * network
423     */
424    public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
425        connector.setBrokerService(this);
426        connector.setLocalUri(getVmConnectorURI());
427        // Set a connection filter so that the connector does not establish loop
428        // back connections.
429        connector.setConnectionFilter(new ConnectionFilter() {
430            @Override
431            public boolean connectTo(URI location) {
432                List<TransportConnector> transportConnectors = getTransportConnectors();
433                for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
434                    try {
435                        TransportConnector tc = iter.next();
436                        if (location.equals(tc.getConnectUri())) {
437                            return false;
438                        }
439                    } catch (Throwable e) {
440                    }
441                }
442                return true;
443            }
444        });
445        networkConnectors.add(connector);
446        return connector;
447    }
448
449    /**
450     * Removes the given network connector without stopping it. The caller
451     * should call {@link NetworkConnector#stop()} to close the connector
452     */
453    public boolean removeNetworkConnector(NetworkConnector connector) {
454        boolean answer = networkConnectors.remove(connector);
455        if (answer) {
456            unregisterNetworkConnectorMBean(connector);
457        }
458        return answer;
459    }
460
461    public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
462        URI uri = getVmConnectorURI();
463        connector.setLocalUri(uri);
464        proxyConnectors.add(connector);
465        if (isUseJmx()) {
466            registerProxyConnectorMBean(connector);
467        }
468        return connector;
469    }
470
471    public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
472        connector.setBrokerService(this);
473        jmsConnectors.add(connector);
474        if (isUseJmx()) {
475            registerJmsConnectorMBean(connector);
476        }
477        return connector;
478    }
479
480    public JmsConnector removeJmsConnector(JmsConnector connector) {
481        if (jmsConnectors.remove(connector)) {
482            return connector;
483        }
484        return null;
485    }
486
487    public void masterFailed() {
488        if (shutdownOnMasterFailure) {
489            LOG.error("The Master has failed ... shutting down");
490            try {
491                stop();
492            } catch (Exception e) {
493                LOG.error("Failed to stop for master failure", e);
494            }
495        } else {
496            LOG.warn("Master Failed - starting all connectors");
497            try {
498                startAllConnectors();
499                broker.nowMasterBroker();
500            } catch (Exception e) {
501                LOG.error("Failed to startAllConnectors", e);
502            }
503        }
504    }
505
506    public String getUptime() {
507        long delta = getUptimeMillis();
508
509        if (delta == 0) {
510            return "not started";
511        }
512
513        return TimeUtils.printDuration(delta);
514    }
515
516    public long getUptimeMillis() {
517        if (startDate == null) {
518            return 0;
519        }
520
521        return new Date().getTime() - startDate.getTime();
522    }
523
524    public boolean isStarted() {
525        return started.get() && startedLatch.getCount() == 0;
526    }
527
528    /**
529     * Forces a start of the broker.
530     * By default a BrokerService instance that was
531     * previously stopped using BrokerService.stop() cannot be restarted
532     * using BrokerService.start().
533     * This method enforces a restart.
534     * It is not recommended to force a restart of the broker and will not work
535     * for most but some very trivial broker configurations.
536     * For restarting a broker instance we recommend to first call stop() on
537     * the old instance and then recreate a new BrokerService instance.
538     *
539     * @param force - if true enforces a restart.
540     * @throws Exception
541     */
542    public void start(boolean force) throws Exception {
543        forceStart = force;
544        stopped.set(false);
545        started.set(false);
546        start();
547    }
548
549    // Service interface
550    // -------------------------------------------------------------------------
551
552    protected boolean shouldAutostart() {
553        return true;
554    }
555
556    /**
557     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
558     *
559     * delegates to autoStart, done to prevent backwards incompatible signature change
560     */
561    @PostConstruct
562    private void postConstruct() {
563        try {
564            autoStart();
565        } catch (Exception ex) {
566            throw new RuntimeException(ex);
567        }
568    }
569
570    /**
571     *
572     * @throws Exception
573     * @org. apache.xbean.InitMethod
574     */
575    public void autoStart() throws Exception {
576        if(shouldAutostart()) {
577            start();
578        }
579    }
580
581    @Override
582    public void start() throws Exception {
583        if (stopped.get() || !started.compareAndSet(false, true)) {
584            // lets just ignore redundant start() calls
585            // as its way too easy to not be completely sure if start() has been
586            // called or not with the gazillion of different configuration
587            // mechanisms
588            // throw new IllegalStateException("Already started.");
589            return;
590        }
591
592        setStartException(null);
593        stopping.set(false);
594        startDate = new Date();
595        MDC.put("activemq.broker", brokerName);
596
597        try {
598            checkMemorySystemUsageLimits();
599            if (systemExitOnShutdown && useShutdownHook) {
600                throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
601            }
602            processHelperProperties();
603            if (isUseJmx()) {
604                // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and
605                // we cannot cleanup clear that during shutdown of the broker.
606                MDC.remove("activemq.broker");
607                try {
608                    startManagementContext();
609                    for (NetworkConnector connector : getNetworkConnectors()) {
610                        registerNetworkConnectorMBean(connector);
611                    }
612                } finally {
613                    MDC.put("activemq.broker", brokerName);
614                }
615            }
616
617            // in jvm master slave, lets not publish over existing broker till we get the lock
618            final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance();
619            if (brokerRegistry.lookup(getBrokerName()) == null) {
620                brokerRegistry.bind(getBrokerName(), BrokerService.this);
621            }
622            startPersistenceAdapter(startAsync);
623            startBroker(startAsync);
624            brokerRegistry.bind(getBrokerName(), BrokerService.this);
625        } catch (Exception e) {
626            LOG.error("Failed to start Apache ActiveMQ ({}, {})", new Object[]{ getBrokerName(), brokerId }, e);
627            try {
628                if (!stopped.get()) {
629                    stop();
630                }
631            } catch (Exception ex) {
632                LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex);
633            }
634            throw e;
635        } finally {
636            MDC.remove("activemq.broker");
637        }
638    }
639
640    private void startPersistenceAdapter(boolean async) throws Exception {
641        if (async) {
642            new Thread("Persistence Adapter Starting Thread") {
643                @Override
644                public void run() {
645                    try {
646                        doStartPersistenceAdapter();
647                    } catch (Throwable e) {
648                        setStartException(e);
649                    } finally {
650                        synchronized (persistenceAdapterStarted) {
651                            persistenceAdapterStarted.set(true);
652                            persistenceAdapterStarted.notifyAll();
653                        }
654                    }
655                }
656            }.start();
657        } else {
658            doStartPersistenceAdapter();
659        }
660    }
661
662    private void doStartPersistenceAdapter() throws Exception {
663        PersistenceAdapter persistenceAdapterToStart = getPersistenceAdapter();
664        if (persistenceAdapterToStart == null) {
665            checkStartException();
666            throw new ConfigurationException("Cannot start null persistence adapter");
667        }
668        persistenceAdapterToStart.setUsageManager(getProducerSystemUsage());
669        persistenceAdapterToStart.setBrokerName(getBrokerName());
670        LOG.info("Using Persistence Adapter: {}", persistenceAdapterToStart);
671        if (deleteAllMessagesOnStartup) {
672            deleteAllMessages();
673        }
674        persistenceAdapterToStart.start();
675
676        getTempDataStore();
677        if (tempDataStore != null) {
678            try {
679                // start after we have the store lock
680                tempDataStore.start();
681            } catch (Exception e) {
682                RuntimeException exception = new RuntimeException(
683                        "Failed to start temp data store: " + tempDataStore, e);
684                LOG.error(exception.getLocalizedMessage(), e);
685                throw exception;
686            }
687        }
688
689        getJobSchedulerStore();
690        if (jobSchedulerStore != null) {
691            try {
692                jobSchedulerStore.start();
693            } catch (Exception e) {
694                RuntimeException exception = new RuntimeException(
695                        "Failed to start job scheduler store: " + jobSchedulerStore, e);
696                LOG.error(exception.getLocalizedMessage(), e);
697                throw exception;
698            }
699        }
700    }
701
702    private void startBroker(boolean async) throws Exception {
703        if (async) {
704            new Thread("Broker Starting Thread") {
705                @Override
706                public void run() {
707                    try {
708                        synchronized (persistenceAdapterStarted) {
709                            if (!persistenceAdapterStarted.get()) {
710                                persistenceAdapterStarted.wait();
711                            }
712                        }
713                        doStartBroker();
714                    } catch (Throwable t) {
715                        setStartException(t);
716                    }
717                }
718            }.start();
719        } else {
720            doStartBroker();
721        }
722    }
723
724    private void doStartBroker() throws Exception {
725        checkStartException();
726        startDestinations();
727        addShutdownHook();
728
729        broker = getBroker();
730        brokerId = broker.getBrokerId();
731
732        // need to log this after creating the broker so we have its id and name
733        LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId });
734        broker.start();
735
736        if (isUseJmx()) {
737            if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
738                // try to restart management context
739                // typical for slaves that use the same ports as master
740                managementContext.stop();
741                startManagementContext();
742            }
743            ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
744            managedBroker.setContextBroker(broker);
745            adminView.setBroker(managedBroker);
746        }
747
748        if (ioExceptionHandler == null) {
749            setIoExceptionHandler(new DefaultIOExceptionHandler());
750        }
751
752        if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) {
753            ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString());
754            Log4JConfigView log4jConfigView = new Log4JConfigView();
755            AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName);
756        }
757
758        startAllConnectors();
759
760        LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
761        LOG.info("For help or more information please see: http://activemq.apache.org");
762
763        getBroker().brokerServiceStarted();
764        checkStoreSystemUsageLimits();
765        startedLatch.countDown();
766        getBroker().nowMasterBroker();
767    }
768
769    /**
770     * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
771     *
772     * delegates to stop, done to prevent backwards incompatible signature change
773     */
774    @PreDestroy
775    private void preDestroy () {
776        try {
777            stop();
778        } catch (Exception ex) {
779            throw new RuntimeException();
780        }
781    }
782
783    /**
784     *
785     * @throws Exception
786     * @org.apache .xbean.DestroyMethod
787     */
788    @Override
789    public void stop() throws Exception {
790        if (!stopping.compareAndSet(false, true)) {
791            LOG.trace("Broker already stopping/stopped");
792            return;
793        }
794
795        setStartException(new BrokerStoppedException("Stop invoked"));
796        MDC.put("activemq.broker", brokerName);
797
798        if (systemExitOnShutdown) {
799            new Thread() {
800                @Override
801                public void run() {
802                    System.exit(systemExitOnShutdownExitCode);
803                }
804            }.start();
805        }
806
807        LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} );
808
809        removeShutdownHook();
810        if (this.scheduler != null) {
811            this.scheduler.stop();
812            this.scheduler = null;
813        }
814        ServiceStopper stopper = new ServiceStopper();
815        if (services != null) {
816            for (Service service : services) {
817                stopper.stop(service);
818            }
819        }
820        stopAllConnectors(stopper);
821        this.slave = true;
822        // remove any VMTransports connected
823        // this has to be done after services are stopped,
824        // to avoid timing issue with discovery (spinning up a new instance)
825        BrokerRegistry.getInstance().unbind(getBrokerName());
826        VMTransportFactory.stopped(getBrokerName());
827        if (broker != null) {
828            stopper.stop(broker);
829            broker = null;
830        }
831
832        if (jobSchedulerStore != null) {
833            jobSchedulerStore.stop();
834            jobSchedulerStore = null;
835        }
836        if (tempDataStore != null) {
837            tempDataStore.stop();
838            tempDataStore = null;
839        }
840        try {
841            stopper.stop(getPersistenceAdapter());
842            persistenceAdapter = null;
843            if (isUseJmx()) {
844                stopper.stop(managementContext);
845                managementContext = null;
846            }
847            // Clear SelectorParser cache to free memory
848            SelectorParser.clearCache();
849        } finally {
850            started.set(false);
851            stopped.set(true);
852            stoppedLatch.countDown();
853        }
854
855        if (this.taskRunnerFactory != null) {
856            this.taskRunnerFactory.shutdown();
857            this.taskRunnerFactory = null;
858        }
859        if (this.executor != null) {
860            ThreadPoolUtils.shutdownNow(executor);
861            this.executor = null;
862        }
863
864        this.destinationInterceptors = null;
865        this.destinationFactory = null;
866
867        if (startDate != null) {
868            LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()});
869        }
870        LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
871
872        synchronized (shutdownHooks) {
873            for (Runnable hook : shutdownHooks) {
874                try {
875                    hook.run();
876                } catch (Throwable e) {
877                    stopper.onException(hook, e);
878                }
879            }
880        }
881
882        MDC.remove("activemq.broker");
883
884        // and clear start date
885        startDate = null;
886
887        stopper.throwFirstException();
888    }
889
890    public boolean checkQueueSize(String queueName) {
891        long count = 0;
892        long queueSize = 0;
893        Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
894        for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
895            if (entry.getKey().isQueue()) {
896                if (entry.getValue().getName().matches(queueName)) {
897                    queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
898                    count += queueSize;
899                    if (queueSize > 0) {
900                        LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize);
901                    }
902                }
903            }
904        }
905        return count == 0;
906    }
907
908    /**
909     * This method (both connectorName and queueName are using regex to match)
910     * 1. stop the connector (supposed the user input the connector which the
911     * clients connect to) 2. to check whether there is any pending message on
912     * the queues defined by queueName 3. supposedly, after stop the connector,
913     * client should failover to other broker and pending messages should be
914     * forwarded. if no pending messages, the method finally call stop to stop
915     * the broker.
916     *
917     * @param connectorName
918     * @param queueName
919     * @param timeout
920     * @param pollInterval
921     * @throws Exception
922     */
923    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
924        if (isUseJmx()) {
925            if (connectorName == null || queueName == null || timeout <= 0) {
926                throw new Exception(
927                        "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
928            }
929            if (pollInterval <= 0) {
930                pollInterval = 30;
931            }
932            LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{
933                    connectorName, queueName, timeout, pollInterval
934            });
935            TransportConnector connector;
936            for (int i = 0; i < transportConnectors.size(); i++) {
937                connector = transportConnectors.get(i);
938                if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
939                    connector.stop();
940                }
941            }
942            long start = System.currentTimeMillis();
943            while (System.currentTimeMillis() - start < timeout * 1000) {
944                // check quesize until it gets zero
945                if (checkQueueSize(queueName)) {
946                    stop();
947                    break;
948                } else {
949                    Thread.sleep(pollInterval * 1000);
950                }
951            }
952            if (stopped.get()) {
953                LOG.info("Successfully stop the broker.");
954            } else {
955                LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
956            }
957        }
958    }
959
960    /**
961     * A helper method to block the caller thread until the broker has been
962     * stopped
963     */
964    public void waitUntilStopped() {
965        while (isStarted() && !stopped.get()) {
966            try {
967                stoppedLatch.await();
968            } catch (InterruptedException e) {
969                // ignore
970            }
971        }
972    }
973
974    public boolean isStopped() {
975        return stopped.get();
976    }
977
978    /**
979     * A helper method to block the caller thread until the broker has fully started
980     * @return boolean true if wait succeeded false if broker was not started or was stopped
981     */
982    public boolean waitUntilStarted() {
983        return waitUntilStarted(DEFAULT_START_TIMEOUT);
984    }
985
986    /**
987     * A helper method to block the caller thread until the broker has fully started
988     *
989     * @param timeout
990     *        the amount of time to wait before giving up and returning false.
991     *
992     * @return boolean true if wait succeeded false if broker was not started or was stopped
993     */
994    public boolean waitUntilStarted(long timeout) {
995        boolean waitSucceeded = isStarted();
996        long expiration = Math.max(0, timeout + System.currentTimeMillis());
997        while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) {
998            try {
999                if (getStartException() != null) {
1000                    return waitSucceeded;
1001                }
1002                waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
1003            } catch (InterruptedException ignore) {
1004            }
1005        }
1006        return waitSucceeded;
1007    }
1008
1009    // Properties
1010    // -------------------------------------------------------------------------
1011    /**
1012     * Returns the message broker
1013     */
1014    public Broker getBroker() throws Exception {
1015        if (broker == null) {
1016            checkStartException();
1017            broker = createBroker();
1018        }
1019        return broker;
1020    }
1021
1022    /**
1023     * Returns the administration view of the broker; used to create and destroy
1024     * resources such as queues and topics. Note this method returns null if JMX
1025     * is disabled.
1026     */
1027    public BrokerView getAdminView() throws Exception {
1028        if (adminView == null) {
1029            // force lazy creation
1030            getBroker();
1031        }
1032        return adminView;
1033    }
1034
1035    public void setAdminView(BrokerView adminView) {
1036        this.adminView = adminView;
1037    }
1038
1039    public String getBrokerName() {
1040        return brokerName;
1041    }
1042
1043    /**
1044     * Sets the name of this broker; which must be unique in the network
1045     *
1046     * @param brokerName
1047     */
1048    private static final String brokerNameReplacedCharsRegExp = "[^a-zA-Z0-9\\.\\_\\-\\:]";
1049    public void setBrokerName(String brokerName) {
1050        if (brokerName == null) {
1051            throw new NullPointerException("The broker name cannot be null");
1052        }
1053        String str = brokerName.replaceAll(brokerNameReplacedCharsRegExp, "_");
1054        if (!str.equals(brokerName)) {
1055            LOG.error("Broker Name: {} contained illegal characters matching regExp: {} - replaced with {}", brokerName, brokerNameReplacedCharsRegExp, str);
1056        }
1057        this.brokerName = str.trim();
1058    }
1059
1060    public PersistenceAdapterFactory getPersistenceFactory() {
1061        return persistenceFactory;
1062    }
1063
1064    public File getDataDirectoryFile() {
1065        if (dataDirectoryFile == null) {
1066            dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
1067        }
1068        return dataDirectoryFile;
1069    }
1070
1071    public File getBrokerDataDirectory() {
1072        String brokerDir = getBrokerName();
1073        return new File(getDataDirectoryFile(), brokerDir);
1074    }
1075
1076    /**
1077     * Sets the directory in which the data files will be stored by default for
1078     * the JDBC and Journal persistence adaptors.
1079     *
1080     * @param dataDirectory
1081     *            the directory to store data files
1082     */
1083    public void setDataDirectory(String dataDirectory) {
1084        setDataDirectoryFile(new File(dataDirectory));
1085    }
1086
1087    /**
1088     * Sets the directory in which the data files will be stored by default for
1089     * the JDBC and Journal persistence adaptors.
1090     *
1091     * @param dataDirectoryFile
1092     *            the directory to store data files
1093     */
1094    public void setDataDirectoryFile(File dataDirectoryFile) {
1095        this.dataDirectoryFile = dataDirectoryFile;
1096    }
1097
1098    /**
1099     * @return the tmpDataDirectory
1100     */
1101    public File getTmpDataDirectory() {
1102        if (tmpDataDirectory == null) {
1103            tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
1104        }
1105        return tmpDataDirectory;
1106    }
1107
1108    /**
1109     * @param tmpDataDirectory
1110     *            the tmpDataDirectory to set
1111     */
1112    public void setTmpDataDirectory(File tmpDataDirectory) {
1113        this.tmpDataDirectory = tmpDataDirectory;
1114    }
1115
1116    public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
1117        this.persistenceFactory = persistenceFactory;
1118    }
1119
1120    public void setDestinationFactory(DestinationFactory destinationFactory) {
1121        this.destinationFactory = destinationFactory;
1122    }
1123
1124    public boolean isPersistent() {
1125        return persistent;
1126    }
1127
1128    /**
1129     * Sets whether or not persistence is enabled or disabled.
1130     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1131     */
1132    public void setPersistent(boolean persistent) {
1133        this.persistent = persistent;
1134    }
1135
1136    public boolean isPopulateJMSXUserID() {
1137        return populateJMSXUserID;
1138    }
1139
1140    /**
1141     * Sets whether or not the broker should populate the JMSXUserID header.
1142     */
1143    public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
1144        this.populateJMSXUserID = populateJMSXUserID;
1145    }
1146
1147    public SystemUsage getSystemUsage() {
1148        try {
1149            if (systemUsage == null) {
1150
1151                systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore());
1152                systemUsage.setExecutor(getExecutor());
1153                systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB
1154                systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1155                systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
1156                systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1157                addService(this.systemUsage);
1158            }
1159            return systemUsage;
1160        } catch (IOException e) {
1161            LOG.error("Cannot create SystemUsage", e);
1162            throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e);
1163        }
1164    }
1165
1166    public void setSystemUsage(SystemUsage memoryManager) {
1167        if (this.systemUsage != null) {
1168            removeService(this.systemUsage);
1169        }
1170        this.systemUsage = memoryManager;
1171        if (this.systemUsage.getExecutor()==null) {
1172            this.systemUsage.setExecutor(getExecutor());
1173        }
1174        addService(this.systemUsage);
1175    }
1176
1177    /**
1178     * @return the consumerUsageManager
1179     * @throws IOException
1180     */
1181    public SystemUsage getConsumerSystemUsage() throws IOException {
1182        if (this.consumerSystemUsaage == null) {
1183            if (splitSystemUsageForProducersConsumers) {
1184                this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
1185                float portion = consumerSystemUsagePortion / 100f;
1186                this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
1187                addService(this.consumerSystemUsaage);
1188            } else {
1189                consumerSystemUsaage = getSystemUsage();
1190            }
1191        }
1192        return this.consumerSystemUsaage;
1193    }
1194
1195    /**
1196     * @param consumerSystemUsaage
1197     *            the storeSystemUsage to set
1198     */
1199    public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
1200        if (this.consumerSystemUsaage != null) {
1201            removeService(this.consumerSystemUsaage);
1202        }
1203        this.consumerSystemUsaage = consumerSystemUsaage;
1204        addService(this.consumerSystemUsaage);
1205    }
1206
1207    /**
1208     * @return the producerUsageManager
1209     * @throws IOException
1210     */
1211    public SystemUsage getProducerSystemUsage() throws IOException {
1212        if (producerSystemUsage == null) {
1213            if (splitSystemUsageForProducersConsumers) {
1214                producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
1215                float portion = producerSystemUsagePortion / 100f;
1216                producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
1217                addService(producerSystemUsage);
1218            } else {
1219                producerSystemUsage = getSystemUsage();
1220            }
1221        }
1222        return producerSystemUsage;
1223    }
1224
1225    /**
1226     * @param producerUsageManager
1227     *            the producerUsageManager to set
1228     */
1229    public void setProducerSystemUsage(SystemUsage producerUsageManager) {
1230        if (this.producerSystemUsage != null) {
1231            removeService(this.producerSystemUsage);
1232        }
1233        this.producerSystemUsage = producerUsageManager;
1234        addService(this.producerSystemUsage);
1235    }
1236
1237    public synchronized PersistenceAdapter getPersistenceAdapter() throws IOException {
1238        if (persistenceAdapter == null && !hasStartException()) {
1239            persistenceAdapter = createPersistenceAdapter();
1240            configureService(persistenceAdapter);
1241            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1242        }
1243        return persistenceAdapter;
1244    }
1245
1246    /**
1247     * Sets the persistence adaptor implementation to use for this broker
1248     *
1249     * @throws IOException
1250     */
1251    public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1252        if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) {
1253            LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter);
1254            return;
1255        }
1256        this.persistenceAdapter = persistenceAdapter;
1257        configureService(this.persistenceAdapter);
1258        this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1259    }
1260
1261    public TaskRunnerFactory getTaskRunnerFactory() {
1262        if (this.taskRunnerFactory == null) {
1263            this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1264                    isDedicatedTaskRunner());
1265            this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader());
1266        }
1267        return this.taskRunnerFactory;
1268    }
1269
1270    public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1271        this.taskRunnerFactory = taskRunnerFactory;
1272    }
1273
1274    public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1275        if (taskRunnerFactory == null) {
1276            persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1277                    true, 1000, isDedicatedTaskRunner());
1278        }
1279        return persistenceTaskRunnerFactory;
1280    }
1281
1282    public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1283        this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1284    }
1285
1286    public boolean isUseJmx() {
1287        return useJmx;
1288    }
1289
1290    public boolean isEnableStatistics() {
1291        return enableStatistics;
1292    }
1293
1294    /**
1295     * Sets whether or not the Broker's services enable statistics or not.
1296     */
1297    public void setEnableStatistics(boolean enableStatistics) {
1298        this.enableStatistics = enableStatistics;
1299    }
1300
1301    /**
1302     * Sets whether or not the Broker's services should be exposed into JMX or
1303     * not.
1304     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1305     */
1306    public void setUseJmx(boolean useJmx) {
1307        this.useJmx = useJmx;
1308    }
1309
1310    public ObjectName getBrokerObjectName() throws MalformedObjectNameException {
1311        if (brokerObjectName == null) {
1312            brokerObjectName = createBrokerObjectName();
1313        }
1314        return brokerObjectName;
1315    }
1316
1317    /**
1318     * Sets the JMX ObjectName for this broker
1319     */
1320    public void setBrokerObjectName(ObjectName brokerObjectName) {
1321        this.brokerObjectName = brokerObjectName;
1322    }
1323
1324    public ManagementContext getManagementContext() {
1325        if (managementContext == null) {
1326            checkStartException();
1327            managementContext = new ManagementContext();
1328        }
1329        return managementContext;
1330    }
1331
1332    synchronized private void checkStartException() {
1333        if (startException != null) {
1334            throw new BrokerStoppedException(startException);
1335        }
1336    }
1337
1338    synchronized private boolean hasStartException() {
1339        return startException != null;
1340    }
1341
1342    synchronized private void setStartException(Throwable t) {
1343        startException = t;
1344    }
1345
1346    public void setManagementContext(ManagementContext managementContext) {
1347        this.managementContext = managementContext;
1348    }
1349
1350    public NetworkConnector getNetworkConnectorByName(String connectorName) {
1351        for (NetworkConnector connector : networkConnectors) {
1352            if (connector.getName().equals(connectorName)) {
1353                return connector;
1354            }
1355        }
1356        return null;
1357    }
1358
1359    public String[] getNetworkConnectorURIs() {
1360        return networkConnectorURIs;
1361    }
1362
1363    public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1364        this.networkConnectorURIs = networkConnectorURIs;
1365    }
1366
1367    public TransportConnector getConnectorByName(String connectorName) {
1368        for (TransportConnector connector : transportConnectors) {
1369            if (connector.getName().equals(connectorName)) {
1370                return connector;
1371            }
1372        }
1373        return null;
1374    }
1375
1376    public Map<String, String> getTransportConnectorURIsAsMap() {
1377        Map<String, String> answer = new HashMap<String, String>();
1378        for (TransportConnector connector : transportConnectors) {
1379            try {
1380                URI uri = connector.getConnectUri();
1381                if (uri != null) {
1382                    String scheme = uri.getScheme();
1383                    if (scheme != null) {
1384                        answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString());
1385                    }
1386                }
1387            } catch (Exception e) {
1388                LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1389            }
1390        }
1391        return answer;
1392    }
1393
1394    public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){
1395        ProducerBrokerExchange result = null;
1396
1397        for (TransportConnector connector : transportConnectors) {
1398            for (TransportConnection tc: connector.getConnections()){
1399                result = tc.getProducerBrokerExchangeIfExists(producerInfo);
1400                if (result !=null){
1401                    return result;
1402                }
1403            }
1404        }
1405        return result;
1406    }
1407
1408    public String[] getTransportConnectorURIs() {
1409        return transportConnectorURIs;
1410    }
1411
1412    public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1413        this.transportConnectorURIs = transportConnectorURIs;
1414    }
1415
1416    /**
1417     * @return Returns the jmsBridgeConnectors.
1418     */
1419    public JmsConnector[] getJmsBridgeConnectors() {
1420        return jmsBridgeConnectors;
1421    }
1422
1423    /**
1424     * @param jmsConnectors
1425     *            The jmsBridgeConnectors to set.
1426     */
1427    public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1428        this.jmsBridgeConnectors = jmsConnectors;
1429    }
1430
1431    public Service[] getServices() {
1432        return services.toArray(new Service[0]);
1433    }
1434
1435    /**
1436     * Sets the services associated with this broker.
1437     */
1438    public void setServices(Service[] services) {
1439        this.services.clear();
1440        if (services != null) {
1441            for (int i = 0; i < services.length; i++) {
1442                this.services.add(services[i]);
1443            }
1444        }
1445    }
1446
1447    /**
1448     * Adds a new service so that it will be started as part of the broker
1449     * lifecycle
1450     */
1451    public void addService(Service service) {
1452        services.add(service);
1453    }
1454
1455    public void removeService(Service service) {
1456        services.remove(service);
1457    }
1458
1459    public boolean isUseLoggingForShutdownErrors() {
1460        return useLoggingForShutdownErrors;
1461    }
1462
1463    /**
1464     * Sets whether or not we should use commons-logging when reporting errors
1465     * when shutting down the broker
1466     */
1467    public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1468        this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1469    }
1470
1471    public boolean isUseShutdownHook() {
1472        return useShutdownHook;
1473    }
1474
1475    /**
1476     * Sets whether or not we should use a shutdown handler to close down the
1477     * broker cleanly if the JVM is terminated. It is recommended you leave this
1478     * enabled.
1479     */
1480    public void setUseShutdownHook(boolean useShutdownHook) {
1481        this.useShutdownHook = useShutdownHook;
1482    }
1483
1484    public boolean isAdvisorySupport() {
1485        return advisorySupport;
1486    }
1487
1488    /**
1489     * Allows the support of advisory messages to be disabled for performance
1490     * reasons.
1491     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1492     */
1493    public void setAdvisorySupport(boolean advisorySupport) {
1494        this.advisorySupport = advisorySupport;
1495    }
1496
1497    public List<TransportConnector> getTransportConnectors() {
1498        return new ArrayList<TransportConnector>(transportConnectors);
1499    }
1500
1501    /**
1502     * Sets the transport connectors which this broker will listen on for new
1503     * clients
1504     *
1505     * @org.apache.xbean.Property
1506     *                            nestedType="org.apache.activemq.broker.TransportConnector"
1507     */
1508    public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1509        for (TransportConnector connector : transportConnectors) {
1510            addConnector(connector);
1511        }
1512    }
1513
1514    public TransportConnector getTransportConnectorByName(String name){
1515        for (TransportConnector transportConnector : transportConnectors){
1516           if (name.equals(transportConnector.getName())){
1517               return transportConnector;
1518           }
1519        }
1520        return null;
1521    }
1522
1523    public TransportConnector getTransportConnectorByScheme(String scheme){
1524        for (TransportConnector transportConnector : transportConnectors){
1525            if (scheme.equals(transportConnector.getUri().getScheme())){
1526                return transportConnector;
1527            }
1528        }
1529        return null;
1530    }
1531
1532    public List<NetworkConnector> getNetworkConnectors() {
1533        return new ArrayList<NetworkConnector>(networkConnectors);
1534    }
1535
1536    public List<ProxyConnector> getProxyConnectors() {
1537        return new ArrayList<ProxyConnector>(proxyConnectors);
1538    }
1539
1540    /**
1541     * Sets the network connectors which this broker will use to connect to
1542     * other brokers in a federated network
1543     *
1544     * @org.apache.xbean.Property
1545     *                            nestedType="org.apache.activemq.network.NetworkConnector"
1546     */
1547    public void setNetworkConnectors(List<?> networkConnectors) throws Exception {
1548        for (Object connector : networkConnectors) {
1549            addNetworkConnector((NetworkConnector) connector);
1550        }
1551    }
1552
1553    /**
1554     * Sets the network connectors which this broker will use to connect to
1555     * other brokers in a federated network
1556     */
1557    public void setProxyConnectors(List<?> proxyConnectors) throws Exception {
1558        for (Object connector : proxyConnectors) {
1559            addProxyConnector((ProxyConnector) connector);
1560        }
1561    }
1562
1563    public PolicyMap getDestinationPolicy() {
1564        return destinationPolicy;
1565    }
1566
1567    /**
1568     * Sets the destination specific policies available either for exact
1569     * destinations or for wildcard areas of destinations.
1570     */
1571    public void setDestinationPolicy(PolicyMap policyMap) {
1572        this.destinationPolicy = policyMap;
1573    }
1574
1575    public BrokerPlugin[] getPlugins() {
1576        return plugins;
1577    }
1578
1579    /**
1580     * Sets a number of broker plugins to install such as for security
1581     * authentication or authorization
1582     */
1583    public void setPlugins(BrokerPlugin[] plugins) {
1584        this.plugins = plugins;
1585    }
1586
1587    public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1588        return messageAuthorizationPolicy;
1589    }
1590
1591    /**
1592     * Sets the policy used to decide if the current connection is authorized to
1593     * consume a given message
1594     */
1595    public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1596        this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1597    }
1598
1599    /**
1600     * Delete all messages from the persistent store
1601     *
1602     * @throws IOException
1603     */
1604    public void deleteAllMessages() throws IOException {
1605        getPersistenceAdapter().deleteAllMessages();
1606    }
1607
1608    public boolean isDeleteAllMessagesOnStartup() {
1609        return deleteAllMessagesOnStartup;
1610    }
1611
1612    /**
1613     * Sets whether or not all messages are deleted on startup - mostly only
1614     * useful for testing.
1615     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1616     */
1617    public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1618        this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1619    }
1620
1621    public URI getVmConnectorURI() {
1622        if (vmConnectorURI == null) {
1623            try {
1624                vmConnectorURI = new URI("vm://" + getBrokerName());
1625            } catch (URISyntaxException e) {
1626                LOG.error("Badly formed URI from {}", getBrokerName(), e);
1627            }
1628        }
1629        return vmConnectorURI;
1630    }
1631
1632    public void setVmConnectorURI(URI vmConnectorURI) {
1633        this.vmConnectorURI = vmConnectorURI;
1634    }
1635
1636    public String getDefaultSocketURIString() {
1637        if (started.get()) {
1638            if (this.defaultSocketURIString == null) {
1639                for (TransportConnector tc:this.transportConnectors) {
1640                    String result = null;
1641                    try {
1642                        result = tc.getPublishableConnectString();
1643                    } catch (Exception e) {
1644                      LOG.warn("Failed to get the ConnectURI for {}", tc, e);
1645                    }
1646                    if (result != null) {
1647                        // find first publishable uri
1648                        if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
1649                            this.defaultSocketURIString = result;
1650                            break;
1651                        } else {
1652                        // or use the first defined
1653                            if (this.defaultSocketURIString == null) {
1654                                this.defaultSocketURIString = result;
1655                            }
1656                        }
1657                    }
1658                }
1659
1660            }
1661            return this.defaultSocketURIString;
1662        }
1663       return null;
1664    }
1665
1666    /**
1667     * @return Returns the shutdownOnMasterFailure.
1668     */
1669    public boolean isShutdownOnMasterFailure() {
1670        return shutdownOnMasterFailure;
1671    }
1672
1673    /**
1674     * @param shutdownOnMasterFailure
1675     *            The shutdownOnMasterFailure to set.
1676     */
1677    public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1678        this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1679    }
1680
1681    public boolean isKeepDurableSubsActive() {
1682        return keepDurableSubsActive;
1683    }
1684
1685    public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1686        this.keepDurableSubsActive = keepDurableSubsActive;
1687    }
1688
1689    public boolean isUseVirtualTopics() {
1690        return useVirtualTopics;
1691    }
1692
1693    /**
1694     * Sets whether or not <a
1695     * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1696     * Topics</a> should be supported by default if they have not been
1697     * explicitly configured.
1698     */
1699    public void setUseVirtualTopics(boolean useVirtualTopics) {
1700        this.useVirtualTopics = useVirtualTopics;
1701    }
1702
1703    public DestinationInterceptor[] getDestinationInterceptors() {
1704        return destinationInterceptors;
1705    }
1706
1707    public boolean isUseMirroredQueues() {
1708        return useMirroredQueues;
1709    }
1710
1711    /**
1712     * Sets whether or not <a
1713     * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1714     * Queues</a> should be supported by default if they have not been
1715     * explicitly configured.
1716     */
1717    public void setUseMirroredQueues(boolean useMirroredQueues) {
1718        this.useMirroredQueues = useMirroredQueues;
1719    }
1720
1721    /**
1722     * Sets the destination interceptors to use
1723     */
1724    public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1725        this.destinationInterceptors = destinationInterceptors;
1726    }
1727
1728    public ActiveMQDestination[] getDestinations() {
1729        return destinations;
1730    }
1731
1732    /**
1733     * Sets the destinations which should be loaded/created on startup
1734     */
1735    public void setDestinations(ActiveMQDestination[] destinations) {
1736        this.destinations = destinations;
1737    }
1738
1739    /**
1740     * @return the tempDataStore
1741     */
1742    public synchronized PListStore getTempDataStore() {
1743        if (tempDataStore == null) {
1744            if (!isPersistent()) {
1745                return null;
1746            }
1747
1748            try {
1749                PersistenceAdapter pa = getPersistenceAdapter();
1750                if( pa!=null && pa instanceof PListStore) {
1751                    return (PListStore) pa;
1752                }
1753            } catch (IOException e) {
1754                throw new RuntimeException(e);
1755            }
1756
1757            try {
1758                String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl";
1759                this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance();
1760                this.tempDataStore.setDirectory(getTmpDataDirectory());
1761                configureService(tempDataStore);
1762            } catch (Exception e) {
1763                throw new RuntimeException(e);
1764            }
1765        }
1766        return tempDataStore;
1767    }
1768
1769    /**
1770     * @param tempDataStore
1771     *            the tempDataStore to set
1772     */
1773    public void setTempDataStore(PListStore tempDataStore) {
1774        this.tempDataStore = tempDataStore;
1775        configureService(tempDataStore);
1776    }
1777
1778    public int getPersistenceThreadPriority() {
1779        return persistenceThreadPriority;
1780    }
1781
1782    public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1783        this.persistenceThreadPriority = persistenceThreadPriority;
1784    }
1785
1786    /**
1787     * @return the useLocalHostBrokerName
1788     */
1789    public boolean isUseLocalHostBrokerName() {
1790        return this.useLocalHostBrokerName;
1791    }
1792
1793    /**
1794     * @param useLocalHostBrokerName
1795     *            the useLocalHostBrokerName to set
1796     */
1797    public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1798        this.useLocalHostBrokerName = useLocalHostBrokerName;
1799        if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1800            brokerName = LOCAL_HOST_NAME;
1801        }
1802    }
1803
1804    /**
1805     * Looks up and lazily creates if necessary the destination for the given
1806     * JMS name
1807     */
1808    public Destination getDestination(ActiveMQDestination destination) throws Exception {
1809        return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1810    }
1811
1812    public void removeDestination(ActiveMQDestination destination) throws Exception {
1813        getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1814    }
1815
1816    public int getProducerSystemUsagePortion() {
1817        return producerSystemUsagePortion;
1818    }
1819
1820    public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1821        this.producerSystemUsagePortion = producerSystemUsagePortion;
1822    }
1823
1824    public int getConsumerSystemUsagePortion() {
1825        return consumerSystemUsagePortion;
1826    }
1827
1828    public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1829        this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1830    }
1831
1832    public boolean isSplitSystemUsageForProducersConsumers() {
1833        return splitSystemUsageForProducersConsumers;
1834    }
1835
1836    public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1837        this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1838    }
1839
1840    public boolean isMonitorConnectionSplits() {
1841        return monitorConnectionSplits;
1842    }
1843
1844    public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1845        this.monitorConnectionSplits = monitorConnectionSplits;
1846    }
1847
1848    public int getTaskRunnerPriority() {
1849        return taskRunnerPriority;
1850    }
1851
1852    public void setTaskRunnerPriority(int taskRunnerPriority) {
1853        this.taskRunnerPriority = taskRunnerPriority;
1854    }
1855
1856    public boolean isDedicatedTaskRunner() {
1857        return dedicatedTaskRunner;
1858    }
1859
1860    public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1861        this.dedicatedTaskRunner = dedicatedTaskRunner;
1862    }
1863
1864    public boolean isCacheTempDestinations() {
1865        return cacheTempDestinations;
1866    }
1867
1868    public void setCacheTempDestinations(boolean cacheTempDestinations) {
1869        this.cacheTempDestinations = cacheTempDestinations;
1870    }
1871
1872    public int getTimeBeforePurgeTempDestinations() {
1873        return timeBeforePurgeTempDestinations;
1874    }
1875
1876    public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1877        this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1878    }
1879
1880    public boolean isUseTempMirroredQueues() {
1881        return useTempMirroredQueues;
1882    }
1883
1884    public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1885        this.useTempMirroredQueues = useTempMirroredQueues;
1886    }
1887
1888    public synchronized JobSchedulerStore getJobSchedulerStore() {
1889
1890        // If support is off don't allow any scheduler even is user configured their own.
1891        if (!isSchedulerSupport()) {
1892            return null;
1893        }
1894
1895        // If the user configured their own we use it even if persistence is disabled since
1896        // we don't know anything about their implementation.
1897        if (jobSchedulerStore == null) {
1898
1899            if (!isPersistent()) {
1900                this.jobSchedulerStore = new InMemoryJobSchedulerStore();
1901                configureService(jobSchedulerStore);
1902                return this.jobSchedulerStore;
1903            }
1904
1905            try {
1906                PersistenceAdapter pa = getPersistenceAdapter();
1907                if (pa != null) {
1908                    this.jobSchedulerStore = pa.createJobSchedulerStore();
1909                    jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
1910                    configureService(jobSchedulerStore);
1911                    return this.jobSchedulerStore;
1912                }
1913            } catch (IOException e) {
1914                throw new RuntimeException(e);
1915            } catch (UnsupportedOperationException ex) {
1916                // It's ok if the store doesn't implement a scheduler.
1917            } catch (Exception e) {
1918                throw new RuntimeException(e);
1919            }
1920
1921            try {
1922                PersistenceAdapter pa = getPersistenceAdapter();
1923                if (pa != null && pa instanceof JobSchedulerStore) {
1924                    this.jobSchedulerStore = (JobSchedulerStore) pa;
1925                    configureService(jobSchedulerStore);
1926                    return this.jobSchedulerStore;
1927                }
1928            } catch (IOException e) {
1929                throw new RuntimeException(e);
1930            }
1931
1932            // Load the KahaDB store as a last resort, this only works if KahaDB is
1933            // included at runtime, otherwise this will fail.  User should disable
1934            // scheduler support if this fails.
1935            try {
1936                String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
1937                PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
1938                jobSchedulerStore = adaptor.createJobSchedulerStore();
1939                jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
1940                configureService(jobSchedulerStore);
1941                LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile());
1942            } catch (Exception e) {
1943                throw new RuntimeException(e);
1944            }
1945        }
1946        return jobSchedulerStore;
1947    }
1948
1949    public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) {
1950        this.jobSchedulerStore = jobSchedulerStore;
1951        configureService(jobSchedulerStore);
1952    }
1953
1954    //
1955    // Implementation methods
1956    // -------------------------------------------------------------------------
1957    /**
1958     * Handles any lazy-creation helper properties which are added to make
1959     * things easier to configure inside environments such as Spring
1960     *
1961     * @throws Exception
1962     */
1963    protected void processHelperProperties() throws Exception {
1964        if (transportConnectorURIs != null) {
1965            for (int i = 0; i < transportConnectorURIs.length; i++) {
1966                String uri = transportConnectorURIs[i];
1967                addConnector(uri);
1968            }
1969        }
1970        if (networkConnectorURIs != null) {
1971            for (int i = 0; i < networkConnectorURIs.length; i++) {
1972                String uri = networkConnectorURIs[i];
1973                addNetworkConnector(uri);
1974            }
1975        }
1976        if (jmsBridgeConnectors != null) {
1977            for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1978                addJmsConnector(jmsBridgeConnectors[i]);
1979            }
1980        }
1981    }
1982
1983    /**
1984     * Check that the store usage limit is not greater than max usable
1985     * space and adjust if it is
1986     */
1987    protected void checkStoreUsageLimits() throws Exception {
1988        final SystemUsage usage = getSystemUsage();
1989
1990        if (getPersistenceAdapter() != null) {
1991            PersistenceAdapter adapter = getPersistenceAdapter();
1992            checkUsageLimit(adapter.getDirectory(), usage.getStoreUsage(), usage.getStoreUsage().getPercentLimit());
1993
1994            long maxJournalFileSize = 0;
1995            long storeLimit = usage.getStoreUsage().getLimit();
1996
1997            if (adapter instanceof JournaledStore) {
1998                maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength();
1999            }
2000
2001            if (storeLimit > 0 && storeLimit < maxJournalFileSize) {
2002                LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
2003                          " mb, whilst the max journal file size for the store is: " +
2004                          maxJournalFileSize / (1024 * 1024) + " mb, " +
2005                          "the store will not accept any data when used.");
2006
2007            }
2008        }
2009    }
2010
2011    /**
2012     * Check that temporary usage limit is not greater than max usable
2013     * space and adjust if it is
2014     */
2015    protected void checkTmpStoreUsageLimits() throws Exception {
2016        final SystemUsage usage = getSystemUsage();
2017
2018        File tmpDir = getTmpDataDirectory();
2019
2020        if (tmpDir != null) {
2021            checkUsageLimit(tmpDir, usage.getTempUsage(), usage.getTempUsage().getPercentLimit());
2022
2023            if (isPersistent()) {
2024                long maxJournalFileSize;
2025
2026                PListStore store = usage.getTempUsage().getStore();
2027                if (store != null && store instanceof JournaledStore) {
2028                    maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength();
2029                } else {
2030                    maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH;
2031                }
2032                long storeLimit = usage.getTempUsage().getLimit();
2033
2034                if (storeLimit > 0 && storeLimit < maxJournalFileSize) {
2035                    LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
2036                              " mb, whilst the max journal file size for the temporary store is: " +
2037                              maxJournalFileSize / (1024 * 1024) + " mb, " +
2038                              "the temp store will not accept any data when used.");
2039                }
2040            }
2041        }
2042    }
2043
2044    protected void checkUsageLimit(File dir, Usage<?> storeUsage, int percentLimit) throws ConfigurationException {
2045        if (dir != null) {
2046            dir = StoreUtil.findParentDirectory(dir);
2047            String storeName = storeUsage instanceof StoreUsage ? "Store" : "Temporary Store";
2048            long storeLimit = storeUsage.getLimit();
2049            long storeCurrent = storeUsage.getUsage();
2050            long totalSpace = dir.getTotalSpace();
2051            long totalUsableSpace = dir.getUsableSpace() + storeCurrent;
2052            //compute byte value of the percent limit
2053            long bytePercentLimit = totalSpace * percentLimit / 100;
2054            int oneMeg = 1024 * 1024;
2055
2056            //Check if the store limit is less than the percent Limit that was set and also
2057            //the usable space...this means we can grow the store larger
2058            //Changes in partition size (total space) as well as changes in usable space should
2059            //be detected here
2060            if (diskUsageCheckRegrowThreshold > -1 && percentLimit > 0
2061                    && storeLimit < bytePercentLimit && storeLimit < totalUsableSpace){
2062
2063                // set the limit to be bytePercentLimit or usableSpace if
2064                // usableSpace is less than the percentLimit
2065                long newLimit = bytePercentLimit > totalUsableSpace ? totalUsableSpace : bytePercentLimit;
2066
2067                //To prevent changing too often, check threshold
2068                if (newLimit - storeLimit >= diskUsageCheckRegrowThreshold) {
2069                    LOG.info("Usable disk space has been increased, attempting to regrow " + storeName + " limit to "
2070                            + percentLimit + "% of the partition size.");
2071                    storeUsage.setLimit(newLimit);
2072                    LOG.info(storeName + " limit has been increased to " + newLimit * 100 / totalSpace
2073                            + "% (" + newLimit / oneMeg + " mb) of the partition size.");
2074                }
2075
2076            //check if the limit is too large for the amount of usable space
2077            } else if (storeLimit > totalUsableSpace) {
2078                final String message = storeName + " limit is " +  storeLimit / oneMeg
2079                        + " mb (current store usage is " + storeCurrent / oneMeg
2080                        + " mb). The data directory: " + dir.getAbsolutePath()
2081                        + " only has " + totalUsableSpace / oneMeg
2082                        + " mb of usable space.";
2083
2084                if (!isAdjustUsageLimits()) {
2085                    LOG.error(message);
2086                    throw new ConfigurationException(message);
2087                }
2088
2089                if (percentLimit > 0) {
2090                    LOG.warn(storeName + " limit has been set to "
2091                            + percentLimit + "% (" + bytePercentLimit / oneMeg + " mb)"
2092                            + " of the partition size but there is not enough usable space."
2093                            + " The current store limit (which may have been adjusted by a"
2094                            + " previous usage limit check) is set to (" + storeLimit / oneMeg + " mb)"
2095                            + " but only " + totalUsableSpace * 100 / totalSpace + "% (" + totalUsableSpace / oneMeg + " mb)"
2096                            + " is available - resetting limit");
2097                } else {
2098                    LOG.warn(message + " - resetting to maximum available disk space: " +
2099                            totalUsableSpace / oneMeg + " mb");
2100                }
2101                storeUsage.setLimit(totalUsableSpace);
2102            }
2103        }
2104    }
2105
2106    /**
2107     * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to
2108     * update store and temporary store limits if the amount of available space
2109     * plus current store size is less than the existin configured limit
2110     */
2111    protected void scheduleDiskUsageLimitsCheck() throws IOException {
2112        if (schedulePeriodForDiskUsageCheck > 0 &&
2113                (getPersistenceAdapter() != null || getTmpDataDirectory() != null)) {
2114            Runnable diskLimitCheckTask = new Runnable() {
2115                @Override
2116                public void run() {
2117                    try {
2118                        checkStoreUsageLimits();
2119                    } catch (Exception e) {
2120                        LOG.error("Failed to check persistent disk usage limits", e);
2121                    }
2122
2123                    try {
2124                        checkTmpStoreUsageLimits();
2125                    } catch (Exception e) {
2126                        LOG.error("Failed to check temporary store usage limits", e);
2127                    }
2128                }
2129            };
2130            scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck);
2131        }
2132    }
2133
2134    protected void checkMemorySystemUsageLimits() throws Exception {
2135        final SystemUsage usage = getSystemUsage();
2136        long memLimit = usage.getMemoryUsage().getLimit();
2137        long jvmLimit = Runtime.getRuntime().maxMemory();
2138
2139        if (memLimit > jvmLimit) {
2140            final String message = "Memory Usage for the Broker (" + memLimit / (1024 * 1024)
2141                    + "mb) is more than the maximum available for the JVM: " + jvmLimit / (1024 * 1024);
2142
2143            if (adjustUsageLimits) {
2144                usage.getMemoryUsage().setPercentOfJvmHeap(70);
2145                LOG.warn(message + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
2146            } else {
2147                LOG.error(message);
2148                throw new ConfigurationException(message);
2149            }
2150        }
2151    }
2152
2153    protected void checkStoreSystemUsageLimits() throws Exception {
2154        final SystemUsage usage = getSystemUsage();
2155
2156        //Check the persistent store and temp store limits if they exist
2157        //and schedule a periodic check to update disk limits if
2158        //schedulePeriodForDiskLimitCheck is set
2159        checkStoreUsageLimits();
2160        checkTmpStoreUsageLimits();
2161        scheduleDiskUsageLimitsCheck();
2162
2163        if (getJobSchedulerStore() != null) {
2164            JobSchedulerStore scheduler = getJobSchedulerStore();
2165            File schedulerDir = scheduler.getDirectory();
2166            if (schedulerDir != null) {
2167
2168                String schedulerDirPath = schedulerDir.getAbsolutePath();
2169                if (!schedulerDir.isAbsolute()) {
2170                    schedulerDir = new File(schedulerDirPath);
2171                }
2172
2173                while (schedulerDir != null && !schedulerDir.isDirectory()) {
2174                    schedulerDir = schedulerDir.getParentFile();
2175                }
2176                long schedulerLimit = usage.getJobSchedulerUsage().getLimit();
2177                long dirFreeSpace = schedulerDir.getUsableSpace();
2178                if (schedulerLimit > dirFreeSpace) {
2179                    LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) +
2180                             " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() +
2181                             " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " +
2182                            dirFreeSpace / (1024 * 1024) + " mb.");
2183                    usage.getJobSchedulerUsage().setLimit(dirFreeSpace);
2184                }
2185            }
2186        }
2187    }
2188
2189    public void stopAllConnectors(ServiceStopper stopper) {
2190        for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2191            NetworkConnector connector = iter.next();
2192            unregisterNetworkConnectorMBean(connector);
2193            stopper.stop(connector);
2194        }
2195        for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2196            ProxyConnector connector = iter.next();
2197            stopper.stop(connector);
2198        }
2199        for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2200            JmsConnector connector = iter.next();
2201            stopper.stop(connector);
2202        }
2203        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2204            TransportConnector connector = iter.next();
2205            try {
2206                unregisterConnectorMBean(connector);
2207            } catch (IOException e) {
2208            }
2209            stopper.stop(connector);
2210        }
2211    }
2212
2213    protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
2214        try {
2215            ObjectName objectName = createConnectorObjectName(connector);
2216            connector = connector.asManagedConnector(getManagementContext(), objectName);
2217            ConnectorViewMBean view = new ConnectorView(connector);
2218            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2219            return connector;
2220        } catch (Throwable e) {
2221            throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e);
2222        }
2223    }
2224
2225    protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
2226        if (isUseJmx()) {
2227            try {
2228                ObjectName objectName = createConnectorObjectName(connector);
2229                getManagementContext().unregisterMBean(objectName);
2230            } catch (Throwable e) {
2231                throw IOExceptionSupport.create(
2232                        "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
2233            }
2234        }
2235    }
2236
2237    protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
2238        return adaptor;
2239    }
2240
2241    protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
2242        if (isUseJmx()) {}
2243    }
2244
2245    private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
2246        return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName());
2247    }
2248
2249    public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
2250        NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
2251        try {
2252            ObjectName objectName = createNetworkConnectorObjectName(connector);
2253            connector.setObjectName(objectName);
2254            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2255        } catch (Throwable e) {
2256            throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
2257        }
2258    }
2259
2260    protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
2261        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
2262    }
2263
2264    public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException {
2265        return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport);
2266    }
2267
2268    protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
2269        if (isUseJmx()) {
2270            try {
2271                ObjectName objectName = createNetworkConnectorObjectName(connector);
2272                getManagementContext().unregisterMBean(objectName);
2273            } catch (Exception e) {
2274                LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e);
2275            }
2276        }
2277    }
2278
2279    protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
2280        ProxyConnectorView view = new ProxyConnectorView(connector);
2281        try {
2282            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName());
2283            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2284        } catch (Throwable e) {
2285            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2286        }
2287    }
2288
2289    protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
2290        JmsConnectorView view = new JmsConnectorView(connector);
2291        try {
2292            ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName());
2293            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2294        } catch (Throwable e) {
2295            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2296        }
2297    }
2298
2299    /**
2300     * Factory method to create a new broker
2301     *
2302     * @throws Exception
2303     */
2304    protected Broker createBroker() throws Exception {
2305        regionBroker = createRegionBroker();
2306        Broker broker = addInterceptors(regionBroker);
2307        // Add a filter that will stop access to the broker once stopped
2308        broker = new MutableBrokerFilter(broker) {
2309            Broker old;
2310
2311            @Override
2312            public void stop() throws Exception {
2313                old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
2314                    // Just ignore additional stop actions.
2315                    @Override
2316                    public void stop() throws Exception {
2317                    }
2318                });
2319                old.stop();
2320            }
2321
2322            @Override
2323            public void start() throws Exception {
2324                if (forceStart && old != null) {
2325                    this.next.set(old);
2326                }
2327                getNext().start();
2328            }
2329        };
2330        return broker;
2331    }
2332
2333    /**
2334     * Factory method to create the core region broker onto which interceptors
2335     * are added
2336     *
2337     * @throws Exception
2338     */
2339    protected Broker createRegionBroker() throws Exception {
2340        if (destinationInterceptors == null) {
2341            destinationInterceptors = createDefaultDestinationInterceptor();
2342        }
2343        configureServices(destinationInterceptors);
2344        DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
2345        if (destinationFactory == null) {
2346            destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
2347        }
2348        return createRegionBroker(destinationInterceptor);
2349    }
2350
2351    protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
2352        RegionBroker regionBroker;
2353        if (isUseJmx()) {
2354            try {
2355                regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
2356                    getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
2357            } catch(MalformedObjectNameException me){
2358                LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me);
2359                throw new IOException(me);
2360            }
2361        } else {
2362            regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
2363                    destinationInterceptor,getScheduler(),getExecutor());
2364        }
2365        destinationFactory.setRegionBroker(regionBroker);
2366        regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
2367        regionBroker.setBrokerName(getBrokerName());
2368        regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
2369        regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend());
2370        if (brokerId != null) {
2371            regionBroker.setBrokerId(brokerId);
2372        }
2373        return regionBroker;
2374    }
2375
2376    /**
2377     * Create the default destination interceptor
2378     */
2379    protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
2380        List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
2381        if (isUseVirtualTopics()) {
2382            VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
2383            VirtualTopic virtualTopic = new VirtualTopic();
2384            virtualTopic.setName("VirtualTopic.>");
2385            VirtualDestination[] virtualDestinations = { virtualTopic };
2386            interceptor.setVirtualDestinations(virtualDestinations);
2387            answer.add(interceptor);
2388        }
2389        if (isUseMirroredQueues()) {
2390            MirroredQueue interceptor = new MirroredQueue();
2391            answer.add(interceptor);
2392        }
2393        DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
2394        answer.toArray(array);
2395        return array;
2396    }
2397
2398    /**
2399     * Strategy method to add interceptors to the broker
2400     *
2401     * @throws IOException
2402     */
2403    protected Broker addInterceptors(Broker broker) throws Exception {
2404        if (isSchedulerSupport()) {
2405            SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
2406            if (isUseJmx()) {
2407                JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
2408                try {
2409                    ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName());
2410                    AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2411                    this.adminView.setJMSJobScheduler(objectName);
2412                } catch (Throwable e) {
2413                    throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
2414                            + e.getMessage(), e);
2415                }
2416            }
2417            broker = sb;
2418        }
2419        if (isUseJmx()) {
2420            HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker());
2421            try {
2422                ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName());
2423                AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName);
2424            } catch (Throwable e) {
2425                throw IOExceptionSupport.create("Status MBean could not be registered in JMX: "
2426                        + e.getMessage(), e);
2427            }
2428        }
2429        if (isAdvisorySupport()) {
2430            broker = new AdvisoryBroker(broker);
2431        }
2432        broker = new CompositeDestinationBroker(broker);
2433        broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
2434        if (isPopulateJMSXUserID()) {
2435            UserIDBroker userIDBroker = new UserIDBroker(broker);
2436            userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
2437            broker = userIDBroker;
2438        }
2439        if (isMonitorConnectionSplits()) {
2440            broker = new ConnectionSplitBroker(broker);
2441        }
2442        if (plugins != null) {
2443            for (int i = 0; i < plugins.length; i++) {
2444                BrokerPlugin plugin = plugins[i];
2445                broker = plugin.installPlugin(broker);
2446            }
2447        }
2448        return broker;
2449    }
2450
2451    protected PersistenceAdapter createPersistenceAdapter() throws IOException {
2452        if (isPersistent()) {
2453            PersistenceAdapterFactory fac = getPersistenceFactory();
2454            if (fac != null) {
2455                return fac.createPersistenceAdapter();
2456            } else {
2457                try {
2458                    String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
2459                    PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
2460                    File dir = new File(getBrokerDataDirectory(),"KahaDB");
2461                    adaptor.setDirectory(dir);
2462                    return adaptor;
2463                } catch (Throwable e) {
2464                    throw IOExceptionSupport.create(e);
2465                }
2466            }
2467        } else {
2468            return new MemoryPersistenceAdapter();
2469        }
2470    }
2471
2472    protected ObjectName createBrokerObjectName() throws MalformedObjectNameException  {
2473        return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName());
2474    }
2475
2476    protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
2477        TransportServer transport = TransportFactorySupport.bind(this, brokerURI);
2478        return new TransportConnector(transport);
2479    }
2480
2481    /**
2482     * Extracts the port from the options
2483     */
2484    protected Object getPort(Map<?,?> options) {
2485        Object port = options.get("port");
2486        if (port == null) {
2487            port = DEFAULT_PORT;
2488            LOG.warn("No port specified so defaulting to: {}", port);
2489        }
2490        return port;
2491    }
2492
2493    protected void addShutdownHook() {
2494        if (useShutdownHook) {
2495            shutdownHook = new Thread("ActiveMQ ShutdownHook") {
2496                @Override
2497                public void run() {
2498                    containerShutdown();
2499                }
2500            };
2501            Runtime.getRuntime().addShutdownHook(shutdownHook);
2502        }
2503    }
2504
2505    protected void removeShutdownHook() {
2506        if (shutdownHook != null) {
2507            try {
2508                Runtime.getRuntime().removeShutdownHook(shutdownHook);
2509            } catch (Exception e) {
2510                LOG.debug("Caught exception, must be shutting down. This exception is ignored.", e);
2511            }
2512        }
2513    }
2514
2515    /**
2516     * Sets hooks to be executed when broker shut down
2517     *
2518     * @org.apache.xbean.Property
2519     */
2520    public void setShutdownHooks(List<Runnable> hooks) throws Exception {
2521        for (Runnable hook : hooks) {
2522            addShutdownHook(hook);
2523        }
2524    }
2525
2526    /**
2527     * Causes a clean shutdown of the container when the VM is being shut down
2528     */
2529    protected void containerShutdown() {
2530        try {
2531            stop();
2532        } catch (IOException e) {
2533            Throwable linkedException = e.getCause();
2534            if (linkedException != null) {
2535                logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2536            } else {
2537                logError("Failed to shut down: " + e, e);
2538            }
2539            if (!useLoggingForShutdownErrors) {
2540                e.printStackTrace(System.err);
2541            }
2542        } catch (Exception e) {
2543            logError("Failed to shut down: " + e, e);
2544        }
2545    }
2546
2547    protected void logError(String message, Throwable e) {
2548        if (useLoggingForShutdownErrors) {
2549            LOG.error("Failed to shut down: " + e);
2550        } else {
2551            System.err.println("Failed to shut down: " + e);
2552        }
2553    }
2554
2555    /**
2556     * Starts any configured destinations on startup
2557     */
2558    protected void startDestinations() throws Exception {
2559        if (destinations != null) {
2560            ConnectionContext adminConnectionContext = getAdminConnectionContext();
2561            for (int i = 0; i < destinations.length; i++) {
2562                ActiveMQDestination destination = destinations[i];
2563                getBroker().addDestination(adminConnectionContext, destination,true);
2564            }
2565        }
2566        if (isUseVirtualTopics()) {
2567            startVirtualConsumerDestinations();
2568        }
2569    }
2570
2571    /**
2572     * Returns the broker's administration connection context used for
2573     * configuring the broker at startup
2574     */
2575    public ConnectionContext getAdminConnectionContext() throws Exception {
2576        return BrokerSupport.getConnectionContext(getBroker());
2577    }
2578
2579    protected void startManagementContext() throws Exception {
2580        getManagementContext().setBrokerName(brokerName);
2581        getManagementContext().start();
2582        adminView = new BrokerView(this, null);
2583        ObjectName objectName = getBrokerObjectName();
2584        AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2585    }
2586
2587    /**
2588     * Start all transport and network connections, proxies and bridges
2589     *
2590     * @throws Exception
2591     */
2592    public void startAllConnectors() throws Exception {
2593        Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2594        List<TransportConnector> al = new ArrayList<TransportConnector>();
2595        for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2596            TransportConnector connector = iter.next();
2597            al.add(startTransportConnector(connector));
2598        }
2599        if (al.size() > 0) {
2600            // let's clear the transportConnectors list and replace it with
2601            // the started transportConnector instances
2602            this.transportConnectors.clear();
2603            setTransportConnectors(al);
2604        }
2605        this.slave = false;
2606        URI uri = getVmConnectorURI();
2607        Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
2608        map.put("async", "false");
2609        map.put("create","false");
2610        uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
2611
2612        if (!stopped.get()) {
2613            ThreadPoolExecutor networkConnectorStartExecutor = null;
2614            if (isNetworkConnectorStartAsync()) {
2615                // spin up as many threads as needed
2616                networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2617                    10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2618                    new ThreadFactory() {
2619                        int count=0;
2620                        @Override
2621                        public Thread newThread(Runnable runnable) {
2622                            Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2623                            thread.setDaemon(true);
2624                            return thread;
2625                        }
2626                    });
2627            }
2628
2629            for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2630                final NetworkConnector connector = iter.next();
2631                connector.setLocalUri(uri);
2632                connector.setBrokerName(getBrokerName());
2633                connector.setDurableDestinations(durableDestinations);
2634                if (getDefaultSocketURIString() != null) {
2635                    connector.setBrokerURL(getDefaultSocketURIString());
2636                }
2637                if (networkConnectorStartExecutor != null) {
2638                    networkConnectorStartExecutor.execute(new Runnable() {
2639                        @Override
2640                        public void run() {
2641                            try {
2642                                LOG.info("Async start of {}", connector);
2643                                connector.start();
2644                            } catch(Exception e) {
2645                                LOG.error("Async start of network connector: {} failed", connector, e);
2646                            }
2647                        }
2648                    });
2649                } else {
2650                    connector.start();
2651                }
2652            }
2653            if (networkConnectorStartExecutor != null) {
2654                // executor done when enqueued tasks are complete
2655                ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
2656            }
2657
2658            for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2659                ProxyConnector connector = iter.next();
2660                connector.start();
2661            }
2662            for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2663                JmsConnector connector = iter.next();
2664                connector.start();
2665            }
2666            for (Service service : services) {
2667                configureService(service);
2668                service.start();
2669            }
2670        }
2671    }
2672
2673    public TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2674        connector.setBrokerService(this);
2675        connector.setTaskRunnerFactory(getTaskRunnerFactory());
2676        MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2677        if (policy != null) {
2678            connector.setMessageAuthorizationPolicy(policy);
2679        }
2680        if (isUseJmx()) {
2681            connector = registerConnectorMBean(connector);
2682        }
2683        connector.getStatistics().setEnabled(enableStatistics);
2684        connector.start();
2685        return connector;
2686    }
2687
2688    /**
2689     * Perform any custom dependency injection
2690     */
2691    protected void configureServices(Object[] services) {
2692        for (Object service : services) {
2693            configureService(service);
2694        }
2695    }
2696
2697    /**
2698     * Perform any custom dependency injection
2699     */
2700    protected void configureService(Object service) {
2701        if (service instanceof BrokerServiceAware) {
2702            BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2703            serviceAware.setBrokerService(this);
2704        }
2705    }
2706
2707    public void handleIOException(IOException exception) {
2708        if (ioExceptionHandler != null) {
2709            ioExceptionHandler.handle(exception);
2710         } else {
2711            LOG.info("No IOExceptionHandler registered, ignoring IO exception", exception);
2712         }
2713    }
2714
2715    protected void startVirtualConsumerDestinations() throws Exception {
2716        checkStartException();
2717        ConnectionContext adminConnectionContext = getAdminConnectionContext();
2718        Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
2719        DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
2720        if (!destinations.isEmpty()) {
2721            for (ActiveMQDestination destination : destinations) {
2722                if (filter.matches(destination) == true) {
2723                    broker.addDestination(adminConnectionContext, destination, false);
2724                }
2725            }
2726        }
2727    }
2728
2729    private DestinationFilter getVirtualTopicConsumerDestinationFilter() {
2730        // created at startup, so no sync needed
2731        if (virtualConsumerDestinationFilter == null) {
2732            Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>();
2733            if (destinationInterceptors != null) {
2734                for (DestinationInterceptor interceptor : destinationInterceptors) {
2735                    if (interceptor instanceof VirtualDestinationInterceptor) {
2736                        VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor;
2737                        for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) {
2738                            if (virtualDestination instanceof VirtualTopic) {
2739                                consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
2740                            }
2741                            if (isUseVirtualDestSubs()) {
2742                                try {
2743                                    broker.virtualDestinationAdded(getAdminConnectionContext(), virtualDestination);
2744                                    LOG.debug("Adding virtual destination: {}", virtualDestination);
2745                                } catch (Exception e) {
2746                                    LOG.warn("Could not fire virtual destination consumer advisory", e);
2747                                }
2748                            }
2749                        }
2750                    }
2751                }
2752            }
2753            ActiveMQQueue filter = new ActiveMQQueue();
2754            filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{}));
2755            virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter);
2756        }
2757        return virtualConsumerDestinationFilter;
2758    }
2759
2760    protected synchronized ThreadPoolExecutor getExecutor() {
2761        if (this.executor == null) {
2762            this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2763
2764                private long i = 0;
2765
2766                @Override
2767                public Thread newThread(Runnable runnable) {
2768                    this.i++;
2769                    Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i);
2770                    thread.setDaemon(true);
2771                    thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
2772                        @Override
2773                        public void uncaughtException(final Thread t, final Throwable e) {
2774                            LOG.error("Error in thread '{}'", t.getName(), e);
2775                        }
2776                    });
2777                    return thread;
2778                }
2779            }, new RejectedExecutionHandler() {
2780                @Override
2781                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
2782                    try {
2783                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
2784                    } catch (InterruptedException e) {
2785                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
2786                    }
2787
2788                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
2789                }
2790            });
2791        }
2792        return this.executor;
2793    }
2794
2795    public synchronized Scheduler getScheduler() {
2796        if (this.scheduler==null) {
2797            this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2798            try {
2799                this.scheduler.start();
2800            } catch (Exception e) {
2801               LOG.error("Failed to start Scheduler", e);
2802            }
2803        }
2804        return this.scheduler;
2805    }
2806
2807    public Broker getRegionBroker() {
2808        return regionBroker;
2809    }
2810
2811    public void setRegionBroker(Broker regionBroker) {
2812        this.regionBroker = regionBroker;
2813    }
2814
2815    public void addShutdownHook(Runnable hook) {
2816        synchronized (shutdownHooks) {
2817            shutdownHooks.add(hook);
2818        }
2819    }
2820
2821    public void removeShutdownHook(Runnable hook) {
2822        synchronized (shutdownHooks) {
2823            shutdownHooks.remove(hook);
2824        }
2825    }
2826
2827    public boolean isSystemExitOnShutdown() {
2828        return systemExitOnShutdown;
2829    }
2830
2831    /**
2832     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2833     */
2834    public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2835        this.systemExitOnShutdown = systemExitOnShutdown;
2836    }
2837
2838    public int getSystemExitOnShutdownExitCode() {
2839        return systemExitOnShutdownExitCode;
2840    }
2841
2842    public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2843        this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2844    }
2845
2846    public SslContext getSslContext() {
2847        return sslContext;
2848    }
2849
2850    public void setSslContext(SslContext sslContext) {
2851        this.sslContext = sslContext;
2852    }
2853
2854    public boolean isShutdownOnSlaveFailure() {
2855        return shutdownOnSlaveFailure;
2856    }
2857
2858    /**
2859     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2860     */
2861    public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2862        this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2863    }
2864
2865    public boolean isWaitForSlave() {
2866        return waitForSlave;
2867    }
2868
2869    /**
2870     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2871     */
2872    public void setWaitForSlave(boolean waitForSlave) {
2873        this.waitForSlave = waitForSlave;
2874    }
2875
2876    public long getWaitForSlaveTimeout() {
2877        return this.waitForSlaveTimeout;
2878    }
2879
2880    public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2881        this.waitForSlaveTimeout = waitForSlaveTimeout;
2882    }
2883
2884    /**
2885     * Get the passiveSlave
2886     * @return the passiveSlave
2887     */
2888    public boolean isPassiveSlave() {
2889        return this.passiveSlave;
2890    }
2891
2892    /**
2893     * Set the passiveSlave
2894     * @param passiveSlave the passiveSlave to set
2895     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2896     */
2897    public void setPassiveSlave(boolean passiveSlave) {
2898        this.passiveSlave = passiveSlave;
2899    }
2900
2901    /**
2902     * override the Default IOException handler, called when persistence adapter
2903     * has experiences File or JDBC I/O Exceptions
2904     *
2905     * @param ioExceptionHandler
2906     */
2907    public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2908        configureService(ioExceptionHandler);
2909        this.ioExceptionHandler = ioExceptionHandler;
2910    }
2911
2912    public IOExceptionHandler getIoExceptionHandler() {
2913        return ioExceptionHandler;
2914    }
2915
2916    /**
2917     * @return the schedulerSupport
2918     */
2919    public boolean isSchedulerSupport() {
2920        return this.schedulerSupport;
2921    }
2922
2923    /**
2924     * @param schedulerSupport the schedulerSupport to set
2925     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2926     */
2927    public void setSchedulerSupport(boolean schedulerSupport) {
2928        this.schedulerSupport = schedulerSupport;
2929    }
2930
2931    /**
2932     * @return the schedulerDirectory
2933     */
2934    public File getSchedulerDirectoryFile() {
2935        if (this.schedulerDirectoryFile == null) {
2936            this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2937        }
2938        return schedulerDirectoryFile;
2939    }
2940
2941    /**
2942     * @param schedulerDirectory the schedulerDirectory to set
2943     */
2944    public void setSchedulerDirectoryFile(File schedulerDirectory) {
2945        this.schedulerDirectoryFile = schedulerDirectory;
2946    }
2947
2948    public void setSchedulerDirectory(String schedulerDirectory) {
2949        setSchedulerDirectoryFile(new File(schedulerDirectory));
2950    }
2951
2952    public int getSchedulePeriodForDestinationPurge() {
2953        return this.schedulePeriodForDestinationPurge;
2954    }
2955
2956    public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2957        this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2958    }
2959
2960    /**
2961     * @param schedulePeriodForDiskUsageCheck
2962     */
2963    public void setSchedulePeriodForDiskUsageCheck(
2964            int schedulePeriodForDiskUsageCheck) {
2965        this.schedulePeriodForDiskUsageCheck = schedulePeriodForDiskUsageCheck;
2966    }
2967
2968    public int getDiskUsageCheckRegrowThreshold() {
2969        return diskUsageCheckRegrowThreshold;
2970    }
2971
2972    /**
2973     * @param diskUsageCheckRegrowThreshold
2974     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
2975     */
2976    public void setDiskUsageCheckRegrowThreshold(int diskUsageCheckRegrowThreshold) {
2977        this.diskUsageCheckRegrowThreshold = diskUsageCheckRegrowThreshold;
2978    }
2979
2980    public int getMaxPurgedDestinationsPerSweep() {
2981        return this.maxPurgedDestinationsPerSweep;
2982    }
2983
2984    public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) {
2985        this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep;
2986    }
2987
2988    public BrokerContext getBrokerContext() {
2989        return brokerContext;
2990    }
2991
2992    public void setBrokerContext(BrokerContext brokerContext) {
2993        this.brokerContext = brokerContext;
2994    }
2995
2996    public void setBrokerId(String brokerId) {
2997        this.brokerId = new BrokerId(brokerId);
2998    }
2999
3000    public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
3001        return useAuthenticatedPrincipalForJMSXUserID;
3002    }
3003
3004    public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
3005        this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
3006    }
3007
3008    /**
3009     * Should MBeans that support showing the Authenticated User Name information have this
3010     * value filled in or not.
3011     *
3012     * @return true if user names should be exposed in MBeans
3013     */
3014    public boolean isPopulateUserNameInMBeans() {
3015        return this.populateUserNameInMBeans;
3016    }
3017
3018    /**
3019     * Sets whether Authenticated User Name information is shown in MBeans that support this field.
3020     * @param value if MBeans should expose user name information.
3021     */
3022    public void setPopulateUserNameInMBeans(boolean value) {
3023        this.populateUserNameInMBeans = value;
3024    }
3025
3026    /**
3027     * Gets the time in Milliseconds that an invocation of an MBean method will wait before
3028     * failing.  The default value is to wait forever (zero).
3029     *
3030     * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
3031     */
3032    public long getMbeanInvocationTimeout() {
3033        return mbeanInvocationTimeout;
3034    }
3035
3036    /**
3037     * Gets the time in Milliseconds that an invocation of an MBean method will wait before
3038     * failing. The default value is to wait forever (zero).
3039     *
3040     * @param mbeanInvocationTimeout
3041     *      timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
3042     */
3043    public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) {
3044        this.mbeanInvocationTimeout = mbeanInvocationTimeout;
3045    }
3046
3047    public boolean isNetworkConnectorStartAsync() {
3048        return networkConnectorStartAsync;
3049    }
3050
3051    public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
3052        this.networkConnectorStartAsync = networkConnectorStartAsync;
3053    }
3054
3055    public boolean isAllowTempAutoCreationOnSend() {
3056        return allowTempAutoCreationOnSend;
3057    }
3058
3059    /**
3060     * enable if temp destinations need to be propagated through a network when
3061     * advisorySupport==false. This is used in conjunction with the policy
3062     * gcInactiveDestinations for matching temps so they can get removed
3063     * when inactive
3064     *
3065     * @param allowTempAutoCreationOnSend
3066     */
3067    public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
3068        this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
3069    }
3070
3071    public long getOfflineDurableSubscriberTimeout() {
3072        return offlineDurableSubscriberTimeout;
3073    }
3074
3075    public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) {
3076        this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
3077    }
3078
3079    public long getOfflineDurableSubscriberTaskSchedule() {
3080        return offlineDurableSubscriberTaskSchedule;
3081    }
3082
3083    public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) {
3084        this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
3085    }
3086
3087    public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
3088        return isUseVirtualTopics() && destination.isQueue() &&
3089               getVirtualTopicConsumerDestinationFilter().matches(destination);
3090    }
3091
3092    synchronized public Throwable getStartException() {
3093        return startException;
3094    }
3095
3096    public boolean isStartAsync() {
3097        return startAsync;
3098    }
3099
3100    public void setStartAsync(boolean startAsync) {
3101        this.startAsync = startAsync;
3102    }
3103
3104    public boolean isSlave() {
3105        return this.slave;
3106    }
3107
3108    public boolean isStopping() {
3109        return this.stopping.get();
3110    }
3111
3112    /**
3113     * @return true if the broker allowed to restart on shutdown.
3114     */
3115    public boolean isRestartAllowed() {
3116        return restartAllowed;
3117    }
3118
3119    /**
3120     * Sets if the broker allowed to restart on shutdown.
3121     */
3122    public void setRestartAllowed(boolean restartAllowed) {
3123        this.restartAllowed = restartAllowed;
3124    }
3125
3126    /**
3127     * A lifecycle manager of the BrokerService should
3128     * inspect this property after a broker shutdown has occurred
3129     * to find out if the broker needs to be re-created and started
3130     * again.
3131     *
3132     * @return true if the broker wants to be restarted after it shuts down.
3133     */
3134    public boolean isRestartRequested() {
3135        return restartRequested;
3136    }
3137
3138    public void requestRestart() {
3139        this.restartRequested = true;
3140    }
3141
3142    public int getStoreOpenWireVersion() {
3143        return storeOpenWireVersion;
3144    }
3145
3146    public void setStoreOpenWireVersion(int storeOpenWireVersion) {
3147        this.storeOpenWireVersion = storeOpenWireVersion;
3148    }
3149
3150    /**
3151     * @return the current number of connections on this Broker.
3152     */
3153    public int getCurrentConnections() {
3154        return this.currentConnections.get();
3155    }
3156
3157    /**
3158     * @return the total number of connections this broker has handled since startup.
3159     */
3160    public long getTotalConnections() {
3161        return this.totalConnections.get();
3162    }
3163
3164    public void incrementCurrentConnections() {
3165        this.currentConnections.incrementAndGet();
3166    }
3167
3168    public void decrementCurrentConnections() {
3169        this.currentConnections.decrementAndGet();
3170    }
3171
3172    public void incrementTotalConnections() {
3173        this.totalConnections.incrementAndGet();
3174    }
3175
3176    public boolean isRejectDurableConsumers() {
3177        return rejectDurableConsumers;
3178    }
3179
3180    public void setRejectDurableConsumers(boolean rejectDurableConsumers) {
3181        this.rejectDurableConsumers = rejectDurableConsumers;
3182    }
3183
3184    public boolean isUseVirtualDestSubs() {
3185        return useVirtualDestSubs;
3186    }
3187
3188    public void setUseVirtualDestSubs(
3189            boolean useVirtualDestSubs) {
3190        this.useVirtualDestSubs = useVirtualDestSubs;
3191    }
3192
3193    public boolean isUseVirtualDestSubsOnCreation() {
3194        return useVirtualDestSubsOnCreation;
3195    }
3196
3197    public void setUseVirtualDestSubsOnCreation(
3198            boolean useVirtualDestSubsOnCreation) {
3199        this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation;
3200    }
3201
3202    public boolean isAdjustUsageLimits() {
3203        return adjustUsageLimits;
3204    }
3205
3206    public void setAdjustUsageLimits(boolean adjustUsageLimits) {
3207        this.adjustUsageLimits = adjustUsageLimits;
3208    }
3209
3210    public void setRollbackOnlyOnAsyncException(boolean rollbackOnlyOnAsyncException) {
3211        this.rollbackOnlyOnAsyncException = rollbackOnlyOnAsyncException;
3212    }
3213
3214    public boolean isRollbackOnlyOnAsyncException() {
3215        return rollbackOnlyOnAsyncException;
3216    }
3217}