001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.BufferedReader;
020    import java.io.File;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.io.InputStreamReader;
024    import java.net.URI;
025    import java.net.URISyntaxException;
026    import java.net.UnknownHostException;
027    import java.util.ArrayList;
028    import java.util.Date;
029    import java.util.HashMap;
030    import java.util.HashSet;
031    import java.util.Iterator;
032    import java.util.List;
033    import java.util.Locale;
034    import java.util.Map;
035    import java.util.Set;
036    import java.util.concurrent.CopyOnWriteArrayList;
037    import java.util.concurrent.CountDownLatch;
038    import java.util.concurrent.LinkedBlockingQueue;
039    import java.util.concurrent.RejectedExecutionException;
040    import java.util.concurrent.RejectedExecutionHandler;
041    import java.util.concurrent.SynchronousQueue;
042    import java.util.concurrent.ThreadFactory;
043    import java.util.concurrent.ThreadPoolExecutor;
044    import java.util.concurrent.TimeUnit;
045    import java.util.concurrent.atomic.AtomicBoolean;
046    
047    import javax.annotation.PostConstruct;
048    import javax.annotation.PreDestroy;
049    import javax.management.MalformedObjectNameException;
050    import javax.management.ObjectName;
051    
052    import org.apache.activemq.ActiveMQConnectionMetaData;
053    import org.apache.activemq.ConfigurationException;
054    import org.apache.activemq.Service;
055    import org.apache.activemq.advisory.AdvisoryBroker;
056    import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
057    import org.apache.activemq.broker.jmx.AnnotatedMBean;
058    import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
059    import org.apache.activemq.broker.jmx.BrokerView;
060    import org.apache.activemq.broker.jmx.ConnectorView;
061    import org.apache.activemq.broker.jmx.ConnectorViewMBean;
062    import org.apache.activemq.broker.jmx.HealthView;
063    import org.apache.activemq.broker.jmx.HealthViewMBean;
064    import org.apache.activemq.broker.jmx.JmsConnectorView;
065    import org.apache.activemq.broker.jmx.JobSchedulerView;
066    import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
067    import org.apache.activemq.broker.jmx.ManagedRegionBroker;
068    import org.apache.activemq.broker.jmx.ManagementContext;
069    import org.apache.activemq.broker.jmx.NetworkConnectorView;
070    import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
071    import org.apache.activemq.broker.jmx.ProxyConnectorView;
072    import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
073    import org.apache.activemq.broker.region.Destination;
074    import org.apache.activemq.broker.region.DestinationFactory;
075    import org.apache.activemq.broker.region.DestinationFactoryImpl;
076    import org.apache.activemq.broker.region.DestinationInterceptor;
077    import org.apache.activemq.broker.region.RegionBroker;
078    import org.apache.activemq.broker.region.policy.PolicyMap;
079    import org.apache.activemq.broker.region.virtual.MirroredQueue;
080    import org.apache.activemq.broker.region.virtual.VirtualDestination;
081    import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
082    import org.apache.activemq.broker.region.virtual.VirtualTopic;
083    import org.apache.activemq.broker.scheduler.JobSchedulerStore;
084    import org.apache.activemq.broker.scheduler.SchedulerBroker;
085    import org.apache.activemq.command.ActiveMQDestination;
086    import org.apache.activemq.command.ActiveMQQueue;
087    import org.apache.activemq.command.BrokerId;
088    import org.apache.activemq.filter.DestinationFilter;
089    import org.apache.activemq.network.ConnectionFilter;
090    import org.apache.activemq.network.DiscoveryNetworkConnector;
091    import org.apache.activemq.network.NetworkConnector;
092    import org.apache.activemq.network.jms.JmsConnector;
093    import org.apache.activemq.proxy.ProxyConnector;
094    import org.apache.activemq.security.MessageAuthorizationPolicy;
095    import org.apache.activemq.selector.SelectorParser;
096    import org.apache.activemq.store.JournaledStore;
097    import org.apache.activemq.store.PListStore;
098    import org.apache.activemq.store.PersistenceAdapter;
099    import org.apache.activemq.store.PersistenceAdapterFactory;
100    import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
101    import org.apache.activemq.thread.Scheduler;
102    import org.apache.activemq.thread.TaskRunnerFactory;
103    import org.apache.activemq.transport.TransportFactorySupport;
104    import org.apache.activemq.transport.TransportServer;
105    import org.apache.activemq.transport.vm.VMTransportFactory;
106    import org.apache.activemq.usage.SystemUsage;
107    import org.apache.activemq.util.BrokerSupport;
108    import org.apache.activemq.util.DefaultIOExceptionHandler;
109    import org.apache.activemq.util.IOExceptionHandler;
110    import org.apache.activemq.util.IOExceptionSupport;
111    import org.apache.activemq.util.IOHelper;
112    import org.apache.activemq.util.InetAddressUtil;
113    import org.apache.activemq.util.ServiceStopper;
114    import org.apache.activemq.util.ThreadPoolUtils;
115    import org.apache.activemq.util.TimeUtils;
116    import org.apache.activemq.util.URISupport;
117    import org.slf4j.Logger;
118    import org.slf4j.LoggerFactory;
119    import org.slf4j.MDC;
120    
121    /**
122     * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
123     * number of transport connectors, network connectors and a bunch of properties
124     * which can be used to configure the broker as its lazily created.
125     *
126     *
127     * @org.apache.xbean.XBean
128     */
129    public class BrokerService implements Service {
130        public static final String DEFAULT_PORT = "61616";
131        public static final String LOCAL_HOST_NAME;
132        public static final String BROKER_VERSION;
133        public static final String DEFAULT_BROKER_NAME = "localhost";
134        public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
135    
136        private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
137        private static final long serialVersionUID = 7353129142305630237L;
138        private boolean useJmx = true;
139        private boolean enableStatistics = true;
140        private boolean persistent = true;
141        private boolean populateJMSXUserID;
142        private boolean useAuthenticatedPrincipalForJMSXUserID;
143        private boolean populateUserNameInMBeans;
144        private long mbeanInvocationTimeout = 0;
145    
146        private boolean useShutdownHook = true;
147        private boolean useLoggingForShutdownErrors;
148        private boolean shutdownOnMasterFailure;
149        private boolean shutdownOnSlaveFailure;
150        private boolean waitForSlave;
151        private long waitForSlaveTimeout = 600000L;
152        private boolean passiveSlave;
153        private String brokerName = DEFAULT_BROKER_NAME;
154        private File dataDirectoryFile;
155        private File tmpDataDirectory;
156        private Broker broker;
157        private BrokerView adminView;
158        private ManagementContext managementContext;
159        private ObjectName brokerObjectName;
160        private TaskRunnerFactory taskRunnerFactory;
161        private TaskRunnerFactory persistenceTaskRunnerFactory;
162        private SystemUsage systemUsage;
163        private SystemUsage producerSystemUsage;
164        private SystemUsage consumerSystemUsaage;
165        private PersistenceAdapter persistenceAdapter;
166        private PersistenceAdapterFactory persistenceFactory;
167        protected DestinationFactory destinationFactory;
168        private MessageAuthorizationPolicy messageAuthorizationPolicy;
169        private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
170        private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
171        private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
172        private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
173        private final List<Service> services = new ArrayList<Service>();
174        private transient Thread shutdownHook;
175        private String[] transportConnectorURIs;
176        private String[] networkConnectorURIs;
177        private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
178        // to other jms messaging systems
179        private boolean deleteAllMessagesOnStartup;
180        private boolean advisorySupport = true;
181        private URI vmConnectorURI;
182        private String defaultSocketURIString;
183        private PolicyMap destinationPolicy;
184        private final AtomicBoolean started = new AtomicBoolean(false);
185        private final AtomicBoolean stopped = new AtomicBoolean(false);
186        private final AtomicBoolean stopping = new AtomicBoolean(false);
187        private BrokerPlugin[] plugins;
188        private boolean keepDurableSubsActive = true;
189        private boolean useVirtualTopics = true;
190        private boolean useMirroredQueues = false;
191        private boolean useTempMirroredQueues = true;
192        private BrokerId brokerId;
193        private DestinationInterceptor[] destinationInterceptors;
194        private ActiveMQDestination[] destinations;
195        private PListStore tempDataStore;
196        private int persistenceThreadPriority = Thread.MAX_PRIORITY;
197        private boolean useLocalHostBrokerName;
198        private final CountDownLatch stoppedLatch = new CountDownLatch(1);
199        private final CountDownLatch startedLatch = new CountDownLatch(1);
200        private boolean supportFailOver;
201        private Broker regionBroker;
202        private int producerSystemUsagePortion = 60;
203        private int consumerSystemUsagePortion = 40;
204        private boolean splitSystemUsageForProducersConsumers;
205        private boolean monitorConnectionSplits = false;
206        private int taskRunnerPriority = Thread.NORM_PRIORITY;
207        private boolean dedicatedTaskRunner;
208        private boolean cacheTempDestinations = false;// useful for failover
209        private int timeBeforePurgeTempDestinations = 5000;
210        private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
211        private boolean systemExitOnShutdown;
212        private int systemExitOnShutdownExitCode;
213        private SslContext sslContext;
214        private boolean forceStart = false;
215        private IOExceptionHandler ioExceptionHandler;
216        private boolean schedulerSupport = false;
217        private File schedulerDirectoryFile;
218        private Scheduler scheduler;
219        private ThreadPoolExecutor executor;
220        private int schedulePeriodForDestinationPurge= 0;
221        private int maxPurgedDestinationsPerSweep = 0;
222        private BrokerContext brokerContext;
223        private boolean networkConnectorStartAsync = false;
224        private boolean allowTempAutoCreationOnSend;
225        private JobSchedulerStore jobSchedulerStore;
226    
227        private long offlineDurableSubscriberTimeout = -1;
228        private long offlineDurableSubscriberTaskSchedule = 300000;
229        private DestinationFilter virtualConsumerDestinationFilter;
230    
231        private final Object persistenceAdapterLock = new Object();
232        private Throwable startException = null;
233        private boolean startAsync = false;
234        private Date startDate;
235    
236        static {
237            String localHostName = "localhost";
238            try {
239                localHostName =  InetAddressUtil.getLocalHostName();
240            } catch (UnknownHostException e) {
241                LOG.error("Failed to resolve localhost");
242            }
243            LOCAL_HOST_NAME = localHostName;
244    
245            InputStream in = null;
246            String version = null;
247            if ((in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
248                BufferedReader reader = new BufferedReader(new InputStreamReader(in));
249                try {
250                    version = reader.readLine();
251                } catch(Exception e) {
252                }
253            }
254            BROKER_VERSION = version;
255        }
256    
257        @Override
258        public String toString() {
259            return "BrokerService[" + getBrokerName() + "]";
260        }
261    
262        private String getBrokerVersion() {
263            String version = ActiveMQConnectionMetaData.PROVIDER_VERSION;
264            if (version == null) {
265                version = BROKER_VERSION;
266            }
267    
268            return version;
269        }
270    
271        /**
272         * Adds a new transport connector for the given bind address
273         *
274         * @return the newly created and added transport connector
275         * @throws Exception
276         */
277        public TransportConnector addConnector(String bindAddress) throws Exception {
278            return addConnector(new URI(bindAddress));
279        }
280    
281        /**
282         * Adds a new transport connector for the given bind address
283         *
284         * @return the newly created and added transport connector
285         * @throws Exception
286         */
287        public TransportConnector addConnector(URI bindAddress) throws Exception {
288            return addConnector(createTransportConnector(bindAddress));
289        }
290    
291        /**
292         * Adds a new transport connector for the given TransportServer transport
293         *
294         * @return the newly created and added transport connector
295         * @throws Exception
296         */
297        public TransportConnector addConnector(TransportServer transport) throws Exception {
298            return addConnector(new TransportConnector(transport));
299        }
300    
301        /**
302         * Adds a new transport connector
303         *
304         * @return the transport connector
305         * @throws Exception
306         */
307        public TransportConnector addConnector(TransportConnector connector) throws Exception {
308            transportConnectors.add(connector);
309            return connector;
310        }
311    
312        /**
313         * Stops and removes a transport connector from the broker.
314         *
315         * @param connector
316         * @return true if the connector has been previously added to the broker
317         * @throws Exception
318         */
319        public boolean removeConnector(TransportConnector connector) throws Exception {
320            boolean rc = transportConnectors.remove(connector);
321            if (rc) {
322                unregisterConnectorMBean(connector);
323            }
324            return rc;
325        }
326    
327        /**
328         * Adds a new network connector using the given discovery address
329         *
330         * @return the newly created and added network connector
331         * @throws Exception
332         */
333        public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
334            return addNetworkConnector(new URI(discoveryAddress));
335        }
336    
337        /**
338         * Adds a new proxy connector using the given bind address
339         *
340         * @return the newly created and added network connector
341         * @throws Exception
342         */
343        public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
344            return addProxyConnector(new URI(bindAddress));
345        }
346    
347        /**
348         * Adds a new network connector using the given discovery address
349         *
350         * @return the newly created and added network connector
351         * @throws Exception
352         */
353        public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
354            NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
355            return addNetworkConnector(connector);
356        }
357    
358        /**
359         * Adds a new proxy connector using the given bind address
360         *
361         * @return the newly created and added network connector
362         * @throws Exception
363         */
364        public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
365            ProxyConnector connector = new ProxyConnector();
366            connector.setBind(bindAddress);
367            connector.setRemote(new URI("fanout:multicast://default"));
368            return addProxyConnector(connector);
369        }
370    
371        /**
372         * Adds a new network connector to connect this broker to a federated
373         * network
374         */
375        public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
376            connector.setBrokerService(this);
377            URI uri = getVmConnectorURI();
378            Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
379            map.put("network", "true");
380            uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
381            connector.setLocalUri(uri);
382            // Set a connection filter so that the connector does not establish loop
383            // back connections.
384            connector.setConnectionFilter(new ConnectionFilter() {
385                @Override
386                public boolean connectTo(URI location) {
387                    List<TransportConnector> transportConnectors = getTransportConnectors();
388                    for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
389                        try {
390                            TransportConnector tc = iter.next();
391                            if (location.equals(tc.getConnectUri())) {
392                                return false;
393                            }
394                        } catch (Throwable e) {
395                        }
396                    }
397                    return true;
398                }
399            });
400            networkConnectors.add(connector);
401            if (isUseJmx()) {
402                registerNetworkConnectorMBean(connector);
403            }
404            return connector;
405        }
406    
407        /**
408         * Removes the given network connector without stopping it. The caller
409         * should call {@link NetworkConnector#stop()} to close the connector
410         */
411        public boolean removeNetworkConnector(NetworkConnector connector) {
412            boolean answer = networkConnectors.remove(connector);
413            if (answer) {
414                unregisterNetworkConnectorMBean(connector);
415            }
416            return answer;
417        }
418    
419        public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
420            URI uri = getVmConnectorURI();
421            connector.setLocalUri(uri);
422            proxyConnectors.add(connector);
423            if (isUseJmx()) {
424                registerProxyConnectorMBean(connector);
425            }
426            return connector;
427        }
428    
429        public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
430            connector.setBrokerService(this);
431            jmsConnectors.add(connector);
432            if (isUseJmx()) {
433                registerJmsConnectorMBean(connector);
434            }
435            return connector;
436        }
437    
438        public JmsConnector removeJmsConnector(JmsConnector connector) {
439            if (jmsConnectors.remove(connector)) {
440                return connector;
441            }
442            return null;
443        }
444    
445        public void masterFailed() {
446            if (shutdownOnMasterFailure) {
447                LOG.error("The Master has failed ... shutting down");
448                try {
449                    stop();
450                } catch (Exception e) {
451                    LOG.error("Failed to stop for master failure", e);
452                }
453            } else {
454                LOG.warn("Master Failed - starting all connectors");
455                try {
456                    startAllConnectors();
457                    broker.nowMasterBroker();
458                } catch (Exception e) {
459                    LOG.error("Failed to startAllConnectors", e);
460                }
461            }
462        }
463    
464        public String getUptime() {
465            // compute and log uptime
466            if (startDate == null) {
467                return "not started";
468            }
469            long delta = new Date().getTime() - startDate.getTime();
470            return TimeUtils.printDuration(delta);
471        }
472    
473        public boolean isStarted() {
474            return started.get() && startedLatch.getCount() == 0;
475        }
476    
477        /**
478         * Forces a start of the broker.
479         * By default a BrokerService instance that was
480         * previously stopped using BrokerService.stop() cannot be restarted
481         * using BrokerService.start().
482         * This method enforces a restart.
483         * It is not recommended to force a restart of the broker and will not work
484         * for most but some very trivial broker configurations.
485         * For restarting a broker instance we recommend to first call stop() on
486         * the old instance and then recreate a new BrokerService instance.
487         *
488         * @param force - if true enforces a restart.
489         * @throws Exception
490         */
491        public void start(boolean force) throws Exception {
492            forceStart = force;
493            stopped.set(false);
494            started.set(false);
495            start();
496        }
497    
498        // Service interface
499        // -------------------------------------------------------------------------
500    
501        protected boolean shouldAutostart() {
502            return true;
503        }
504    
505        /**
506         *
507         * @throws Exception
508         * @org. apache.xbean.InitMethod
509         */
510        @PostConstruct
511        public void autoStart() throws Exception {
512            if(shouldAutostart()) {
513                start();
514            }
515        }
516    
517        @Override
518        public void start() throws Exception {
519            if (stopped.get() || !started.compareAndSet(false, true)) {
520                // lets just ignore redundant start() calls
521                // as its way too easy to not be completely sure if start() has been
522                // called or not with the gazillion of different configuration
523                // mechanisms
524                // throw new IllegalStateException("Already started.");
525                return;
526            }
527    
528            stopping.set(false);
529            startDate = new Date();
530            MDC.put("activemq.broker", brokerName);
531    
532            try {
533                if (systemExitOnShutdown && useShutdownHook) {
534                    throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
535                }
536                processHelperProperties();
537                if (isUseJmx()) {
538                    // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and
539                    // we cannot cleanup clear that during shutdown of the broker.
540                    MDC.remove("activemq.broker");
541                    try {
542                        startManagementContext();
543                    } finally {
544                        MDC.put("activemq.broker", brokerName);
545                    }
546                }
547                // in jvm master slave, lets not publish over existing broker till we get the lock
548                final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance();
549                if (brokerRegistry.lookup(getBrokerName()) == null) {
550                    brokerRegistry.bind(getBrokerName(), BrokerService.this);
551                }
552                startPersistenceAdapter(startAsync);
553                startBroker(startAsync);
554                brokerRegistry.bind(getBrokerName(), BrokerService.this);
555            } catch (Exception e) {
556                LOG.error("Failed to start Apache ActiveMQ (" + getBrokerName() + ", " + brokerId + "). Reason: " + e, e);
557                try {
558                    if (!stopped.get()) {
559                        stop();
560                    }
561                } catch (Exception ex) {
562                    LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex);
563                }
564                throw e;
565            } finally {
566                MDC.remove("activemq.broker");
567            }
568        }
569    
570        private void startPersistenceAdapter(boolean async) throws Exception {
571            if (async) {
572                new Thread("Persistence Adapter Starting Thread") {
573                    @Override
574                    public void run() {
575                        try {
576                            doStartPersistenceAdapter();
577                        } catch (Throwable e) {
578                            startException = e;
579                        } finally {
580                            synchronized (persistenceAdapterLock) {
581                                persistenceAdapterLock.notifyAll();
582                            }
583                        }
584                    }
585                }.start();
586            } else {
587                doStartPersistenceAdapter();
588            }
589        }
590    
591        private void doStartPersistenceAdapter() throws Exception {
592            getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
593            getPersistenceAdapter().setBrokerName(getBrokerName());
594            LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
595            if (deleteAllMessagesOnStartup) {
596                deleteAllMessages();
597            }
598            getPersistenceAdapter().start();
599        }
600    
601        private void startBroker(boolean async) throws Exception {
602            if (async) {
603                new Thread("Broker Starting Thread") {
604                    @Override
605                    public void run() {
606                        try {
607                            synchronized (persistenceAdapterLock) {
608                                persistenceAdapterLock.wait();
609                            }
610                            doStartBroker();
611                        } catch (Throwable t) {
612                            startException = t;
613                        }
614                    }
615                }.start();
616            } else {
617                doStartBroker();
618            }
619        }
620    
621        private void doStartBroker() throws Exception {
622            if (startException != null) {
623                return;
624            }
625            startDestinations();
626            addShutdownHook();
627    
628            broker = getBroker();
629            brokerId = broker.getBrokerId();
630    
631            // need to log this after creating the broker so we have its id and name
632            if (LOG.isInfoEnabled()) {
633                LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
634                        + getBrokerName() + ", " + brokerId + ") is starting");
635            }
636            broker.start();
637    
638            if (isUseJmx()) {
639                if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
640                    // try to restart management context
641                    // typical for slaves that use the same ports as master
642                    managementContext.stop();
643                    startManagementContext();
644                }
645                ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
646                managedBroker.setContextBroker(broker);
647                adminView.setBroker(managedBroker);
648            }
649    
650            startAllConnectors();
651    
652            if (ioExceptionHandler == null) {
653                setIoExceptionHandler(new DefaultIOExceptionHandler());
654            }
655    
656            if (LOG.isInfoEnabled()) {
657                LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
658                        + getBrokerName() + ", " + brokerId + ") started");
659                LOG.info("For help or more information please see: http://activemq.apache.org");
660            }
661    
662            getBroker().brokerServiceStarted();
663            checkSystemUsageLimits();
664            startedLatch.countDown();
665            getBroker().nowMasterBroker();
666        }
667    
668        /**
669         *
670         * @throws Exception
671         * @org.apache .xbean.DestroyMethod
672         */
673        @Override
674        @PreDestroy
675        public void stop() throws Exception {
676            if (!stopping.compareAndSet(false, true)) {
677                LOG.trace("Broker already stopping/stopped");
678                return;
679            }
680    
681            MDC.put("activemq.broker", brokerName);
682    
683            if (systemExitOnShutdown) {
684                new Thread() {
685                    @Override
686                    public void run() {
687                        System.exit(systemExitOnShutdownExitCode);
688                    }
689                }.start();
690            }
691    
692            if (LOG.isInfoEnabled()) {
693                LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
694                        + getBrokerName() + ", " + brokerId + ") is shutting down");
695            }
696    
697            removeShutdownHook();
698            if (this.scheduler != null) {
699                this.scheduler.stop();
700                this.scheduler = null;
701            }
702            ServiceStopper stopper = new ServiceStopper();
703            if (services != null) {
704                for (Service service : services) {
705                    stopper.stop(service);
706                }
707            }
708            stopAllConnectors(stopper);
709            // remove any VMTransports connected
710            // this has to be done after services are stopped,
711            // to avoid timing issue with discovery (spinning up a new instance)
712            BrokerRegistry.getInstance().unbind(getBrokerName());
713            VMTransportFactory.stopped(getBrokerName());
714            if (broker != null) {
715                stopper.stop(broker);
716                broker = null;
717            }
718    
719            if (jobSchedulerStore != null) {
720                jobSchedulerStore.stop();
721                jobSchedulerStore = null;
722            }
723            if (tempDataStore != null) {
724                tempDataStore.stop();
725                tempDataStore = null;
726            }
727            try {
728                stopper.stop(persistenceAdapter);
729                persistenceAdapter = null;
730                if (isUseJmx()) {
731                    stopper.stop(getManagementContext());
732                    managementContext = null;
733                }
734                // Clear SelectorParser cache to free memory
735                SelectorParser.clearCache();
736            } finally {
737                started.set(false);
738                stopped.set(true);
739                stoppedLatch.countDown();
740            }
741    
742            if (this.taskRunnerFactory != null) {
743                this.taskRunnerFactory.shutdown();
744                this.taskRunnerFactory = null;
745            }
746            if (this.executor != null) {
747                ThreadPoolUtils.shutdownNow(executor);
748                this.executor = null;
749            }
750    
751            this.destinationInterceptors = null;
752            this.destinationFactory = null;
753    
754            if (LOG.isInfoEnabled()) {
755                if (startDate != null) {
756                    LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
757                            + getBrokerName() + ", " + brokerId + ") uptime " + getUptime());
758                }
759                LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
760                        + getBrokerName() + ", " + brokerId + ") is shutdown");
761            }
762    
763            synchronized (shutdownHooks) {
764                for (Runnable hook : shutdownHooks) {
765                    try {
766                        hook.run();
767                    } catch (Throwable e) {
768                        stopper.onException(hook, e);
769                    }
770                }
771            }
772    
773            MDC.remove("activemq.broker");
774    
775            // and clear start date
776            startDate = null;
777    
778            stopper.throwFirstException();
779        }
780    
781        public boolean checkQueueSize(String queueName) {
782            long count = 0;
783            long queueSize = 0;
784            Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
785            for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
786                if (entry.getKey().isQueue()) {
787                    if (entry.getValue().getName().matches(queueName)) {
788                        queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
789                        count += queueSize;
790                        if (queueSize > 0) {
791                            LOG.info("Queue has pending message:" + entry.getValue().getName() + " queueSize is:"
792                                    + queueSize);
793                        }
794                    }
795                }
796            }
797            return count == 0;
798        }
799    
800        /**
801         * This method (both connectorName and queueName are using regex to match)
802         * 1. stop the connector (supposed the user input the connector which the
803         * clients connect to) 2. to check whether there is any pending message on
804         * the queues defined by queueName 3. supposedly, after stop the connector,
805         * client should failover to other broker and pending messages should be
806         * forwarded. if no pending messages, the method finally call stop to stop
807         * the broker.
808         *
809         * @param connectorName
810         * @param queueName
811         * @param timeout
812         * @param pollInterval
813         * @throws Exception
814         */
815        public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
816            if (isUseJmx()) {
817                if (connectorName == null || queueName == null || timeout <= 0) {
818                    throw new Exception(
819                            "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
820                }
821                if (pollInterval <= 0) {
822                    pollInterval = 30;
823                }
824                LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:" + queueName + " timeout:"
825                        + timeout + " pollInterval:" + pollInterval);
826                TransportConnector connector;
827                for (int i = 0; i < transportConnectors.size(); i++) {
828                    connector = transportConnectors.get(i);
829                    if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
830                        connector.stop();
831                    }
832                }
833                long start = System.currentTimeMillis();
834                while (System.currentTimeMillis() - start < timeout * 1000) {
835                    // check quesize until it gets zero
836                    if (checkQueueSize(queueName)) {
837                        stop();
838                        break;
839                    } else {
840                        Thread.sleep(pollInterval * 1000);
841                    }
842                }
843                if (stopped.get()) {
844                    LOG.info("Successfully stop the broker.");
845                } else {
846                    LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
847                }
848            }
849        }
850    
851        /**
852         * A helper method to block the caller thread until the broker has been
853         * stopped
854         */
855        public void waitUntilStopped() {
856            while (isStarted() && !stopped.get()) {
857                try {
858                    stoppedLatch.await();
859                } catch (InterruptedException e) {
860                    // ignore
861                }
862            }
863        }
864    
865        /**
866         * A helper method to block the caller thread until the broker has fully started
867         * @return boolean true if wait succeeded false if broker was not started or was stopped
868         */
869        public boolean waitUntilStarted() {
870            boolean waitSucceeded = isStarted();
871            while (!isStarted() && !stopped.get() && !waitSucceeded) {
872                try {
873                    if (startException != null) {
874                        return waitSucceeded;
875                    }
876                    waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
877                } catch (InterruptedException ignore) {
878                }
879            }
880            return waitSucceeded;
881        }
882    
883        // Properties
884        // -------------------------------------------------------------------------
885        /**
886         * Returns the message broker
887         */
888        public Broker getBroker() throws Exception {
889            if (broker == null) {
890                broker = createBroker();
891            }
892            return broker;
893        }
894    
895        /**
896         * Returns the administration view of the broker; used to create and destroy
897         * resources such as queues and topics. Note this method returns null if JMX
898         * is disabled.
899         */
900        public BrokerView getAdminView() throws Exception {
901            if (adminView == null) {
902                // force lazy creation
903                getBroker();
904            }
905            return adminView;
906        }
907    
908        public void setAdminView(BrokerView adminView) {
909            this.adminView = adminView;
910        }
911    
912        public String getBrokerName() {
913            return brokerName;
914        }
915    
916        /**
917         * Sets the name of this broker; which must be unique in the network
918         *
919         * @param brokerName
920         */
921        public void setBrokerName(String brokerName) {
922            if (brokerName == null) {
923                throw new NullPointerException("The broker name cannot be null");
924            }
925            String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
926            if (!str.equals(brokerName)) {
927                LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str);
928            }
929            this.brokerName = str.trim();
930        }
931    
932        public PersistenceAdapterFactory getPersistenceFactory() {
933            return persistenceFactory;
934        }
935    
936        public File getDataDirectoryFile() {
937            if (dataDirectoryFile == null) {
938                dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
939            }
940            return dataDirectoryFile;
941        }
942    
943        public File getBrokerDataDirectory() {
944            String brokerDir = getBrokerName();
945            return new File(getDataDirectoryFile(), brokerDir);
946        }
947    
948        /**
949         * Sets the directory in which the data files will be stored by default for
950         * the JDBC and Journal persistence adaptors.
951         *
952         * @param dataDirectory
953         *            the directory to store data files
954         */
955        public void setDataDirectory(String dataDirectory) {
956            setDataDirectoryFile(new File(dataDirectory));
957        }
958    
959        /**
960         * Sets the directory in which the data files will be stored by default for
961         * the JDBC and Journal persistence adaptors.
962         *
963         * @param dataDirectoryFile
964         *            the directory to store data files
965         */
966        public void setDataDirectoryFile(File dataDirectoryFile) {
967            this.dataDirectoryFile = dataDirectoryFile;
968        }
969    
970        /**
971         * @return the tmpDataDirectory
972         */
973        public File getTmpDataDirectory() {
974            if (tmpDataDirectory == null) {
975                tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
976            }
977            return tmpDataDirectory;
978        }
979    
980        /**
981         * @param tmpDataDirectory
982         *            the tmpDataDirectory to set
983         */
984        public void setTmpDataDirectory(File tmpDataDirectory) {
985            this.tmpDataDirectory = tmpDataDirectory;
986        }
987    
988        public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
989            this.persistenceFactory = persistenceFactory;
990        }
991    
992        public void setDestinationFactory(DestinationFactory destinationFactory) {
993            this.destinationFactory = destinationFactory;
994        }
995    
996        public boolean isPersistent() {
997            return persistent;
998        }
999    
1000        /**
1001         * Sets whether or not persistence is enabled or disabled.
1002         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1003         */
1004        public void setPersistent(boolean persistent) {
1005            this.persistent = persistent;
1006        }
1007    
1008        public boolean isPopulateJMSXUserID() {
1009            return populateJMSXUserID;
1010        }
1011    
1012        /**
1013         * Sets whether or not the broker should populate the JMSXUserID header.
1014         */
1015        public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
1016            this.populateJMSXUserID = populateJMSXUserID;
1017        }
1018    
1019        public SystemUsage getSystemUsage() {
1020            try {
1021                if (systemUsage == null) {
1022    
1023                    systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore());
1024                    systemUsage.setExecutor(getExecutor());
1025                    systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // 64 MB
1026                    systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1027                    systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
1028                    systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1000 * 50); // 50 // Gb
1029                    addService(this.systemUsage);
1030                }
1031                return systemUsage;
1032            } catch (IOException e) {
1033                LOG.error("Cannot create SystemUsage", e);
1034                throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
1035            }
1036        }
1037    
1038        public void setSystemUsage(SystemUsage memoryManager) {
1039            if (this.systemUsage != null) {
1040                removeService(this.systemUsage);
1041            }
1042            this.systemUsage = memoryManager;
1043            if (this.systemUsage.getExecutor()==null) {
1044                this.systemUsage.setExecutor(getExecutor());
1045            }
1046            addService(this.systemUsage);
1047        }
1048    
1049        /**
1050         * @return the consumerUsageManager
1051         * @throws IOException
1052         */
1053        public SystemUsage getConsumerSystemUsage() throws IOException {
1054            if (this.consumerSystemUsaage == null) {
1055                if (splitSystemUsageForProducersConsumers) {
1056                    this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
1057                    float portion = consumerSystemUsagePortion / 100f;
1058                    this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
1059                    addService(this.consumerSystemUsaage);
1060                } else {
1061                    consumerSystemUsaage = getSystemUsage();
1062                }
1063            }
1064            return this.consumerSystemUsaage;
1065        }
1066    
1067        /**
1068         * @param consumerSystemUsaage
1069         *            the storeSystemUsage to set
1070         */
1071        public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
1072            if (this.consumerSystemUsaage != null) {
1073                removeService(this.consumerSystemUsaage);
1074            }
1075            this.consumerSystemUsaage = consumerSystemUsaage;
1076            addService(this.consumerSystemUsaage);
1077        }
1078    
1079        /**
1080         * @return the producerUsageManager
1081         * @throws IOException
1082         */
1083        public SystemUsage getProducerSystemUsage() throws IOException {
1084            if (producerSystemUsage == null) {
1085                if (splitSystemUsageForProducersConsumers) {
1086                    producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
1087                    float portion = producerSystemUsagePortion / 100f;
1088                    producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
1089                    addService(producerSystemUsage);
1090                } else {
1091                    producerSystemUsage = getSystemUsage();
1092                }
1093            }
1094            return producerSystemUsage;
1095        }
1096    
1097        /**
1098         * @param producerUsageManager
1099         *            the producerUsageManager to set
1100         */
1101        public void setProducerSystemUsage(SystemUsage producerUsageManager) {
1102            if (this.producerSystemUsage != null) {
1103                removeService(this.producerSystemUsage);
1104            }
1105            this.producerSystemUsage = producerUsageManager;
1106            addService(this.producerSystemUsage);
1107        }
1108    
1109        public PersistenceAdapter getPersistenceAdapter() throws IOException {
1110            if (persistenceAdapter == null) {
1111                persistenceAdapter = createPersistenceAdapter();
1112                configureService(persistenceAdapter);
1113                this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1114            }
1115            return persistenceAdapter;
1116        }
1117    
1118        /**
1119         * Sets the persistence adaptor implementation to use for this broker
1120         *
1121         * @throws IOException
1122         */
1123        public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1124            if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) {
1125                LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: " + persistenceAdapter);
1126                return;
1127            }
1128            this.persistenceAdapter = persistenceAdapter;
1129            configureService(this.persistenceAdapter);
1130            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1131        }
1132    
1133        public TaskRunnerFactory getTaskRunnerFactory() {
1134            if (this.taskRunnerFactory == null) {
1135                this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1136                        isDedicatedTaskRunner());
1137            }
1138            return this.taskRunnerFactory;
1139        }
1140    
1141        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1142            this.taskRunnerFactory = taskRunnerFactory;
1143        }
1144    
1145        public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1146            if (taskRunnerFactory == null) {
1147                persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1148                        true, 1000, isDedicatedTaskRunner());
1149            }
1150            return persistenceTaskRunnerFactory;
1151        }
1152    
1153        public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1154            this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1155        }
1156    
1157        public boolean isUseJmx() {
1158            return useJmx;
1159        }
1160    
1161        public boolean isEnableStatistics() {
1162            return enableStatistics;
1163        }
1164    
1165        /**
1166         * Sets whether or not the Broker's services enable statistics or not.
1167         */
1168        public void setEnableStatistics(boolean enableStatistics) {
1169            this.enableStatistics = enableStatistics;
1170        }
1171    
1172        /**
1173         * Sets whether or not the Broker's services should be exposed into JMX or
1174         * not.
1175         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1176         */
1177        public void setUseJmx(boolean useJmx) {
1178            this.useJmx = useJmx;
1179        }
1180    
1181        public ObjectName getBrokerObjectName() throws MalformedObjectNameException {
1182            if (brokerObjectName == null) {
1183                brokerObjectName = createBrokerObjectName();
1184            }
1185            return brokerObjectName;
1186        }
1187    
1188        /**
1189         * Sets the JMX ObjectName for this broker
1190         */
1191        public void setBrokerObjectName(ObjectName brokerObjectName) {
1192            this.brokerObjectName = brokerObjectName;
1193        }
1194    
1195        public ManagementContext getManagementContext() {
1196            if (managementContext == null) {
1197                managementContext = new ManagementContext();
1198            }
1199            return managementContext;
1200        }
1201    
1202        public void setManagementContext(ManagementContext managementContext) {
1203            this.managementContext = managementContext;
1204        }
1205    
1206        public NetworkConnector getNetworkConnectorByName(String connectorName) {
1207            for (NetworkConnector connector : networkConnectors) {
1208                if (connector.getName().equals(connectorName)) {
1209                    return connector;
1210                }
1211            }
1212            return null;
1213        }
1214    
1215        public String[] getNetworkConnectorURIs() {
1216            return networkConnectorURIs;
1217        }
1218    
1219        public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1220            this.networkConnectorURIs = networkConnectorURIs;
1221        }
1222    
1223        public TransportConnector getConnectorByName(String connectorName) {
1224            for (TransportConnector connector : transportConnectors) {
1225                if (connector.getName().equals(connectorName)) {
1226                    return connector;
1227                }
1228            }
1229            return null;
1230        }
1231    
1232        public Map<String, String> getTransportConnectorURIsAsMap() {
1233            Map<String, String> answer = new HashMap<String, String>();
1234            for (TransportConnector connector : transportConnectors) {
1235                try {
1236                    URI uri = connector.getConnectUri();
1237                    if (uri != null) {
1238                        String scheme = uri.getScheme();
1239                        if (scheme != null) {
1240                            answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString());
1241                        }
1242                    }
1243                } catch (Exception e) {
1244                    LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1245                }
1246            }
1247            return answer;
1248        }
1249    
1250        public String[] getTransportConnectorURIs() {
1251            return transportConnectorURIs;
1252        }
1253    
1254        public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1255            this.transportConnectorURIs = transportConnectorURIs;
1256        }
1257    
1258        /**
1259         * @return Returns the jmsBridgeConnectors.
1260         */
1261        public JmsConnector[] getJmsBridgeConnectors() {
1262            return jmsBridgeConnectors;
1263        }
1264    
1265        /**
1266         * @param jmsConnectors
1267         *            The jmsBridgeConnectors to set.
1268         */
1269        public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1270            this.jmsBridgeConnectors = jmsConnectors;
1271        }
1272    
1273        public Service[] getServices() {
1274            return services.toArray(new Service[0]);
1275        }
1276    
1277        /**
1278         * Sets the services associated with this broker.
1279         */
1280        public void setServices(Service[] services) {
1281            this.services.clear();
1282            if (services != null) {
1283                for (int i = 0; i < services.length; i++) {
1284                    this.services.add(services[i]);
1285                }
1286            }
1287        }
1288    
1289        /**
1290         * Adds a new service so that it will be started as part of the broker
1291         * lifecycle
1292         */
1293        public void addService(Service service) {
1294            services.add(service);
1295        }
1296    
1297        public void removeService(Service service) {
1298            services.remove(service);
1299        }
1300    
1301        public boolean isUseLoggingForShutdownErrors() {
1302            return useLoggingForShutdownErrors;
1303        }
1304    
1305        /**
1306         * Sets whether or not we should use commons-logging when reporting errors
1307         * when shutting down the broker
1308         */
1309        public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1310            this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1311        }
1312    
1313        public boolean isUseShutdownHook() {
1314            return useShutdownHook;
1315        }
1316    
1317        /**
1318         * Sets whether or not we should use a shutdown handler to close down the
1319         * broker cleanly if the JVM is terminated. It is recommended you leave this
1320         * enabled.
1321         */
1322        public void setUseShutdownHook(boolean useShutdownHook) {
1323            this.useShutdownHook = useShutdownHook;
1324        }
1325    
1326        public boolean isAdvisorySupport() {
1327            return advisorySupport;
1328        }
1329    
1330        /**
1331         * Allows the support of advisory messages to be disabled for performance
1332         * reasons.
1333         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1334         */
1335        public void setAdvisorySupport(boolean advisorySupport) {
1336            this.advisorySupport = advisorySupport;
1337        }
1338    
1339        public List<TransportConnector> getTransportConnectors() {
1340            return new ArrayList<TransportConnector>(transportConnectors);
1341        }
1342    
1343        /**
1344         * Sets the transport connectors which this broker will listen on for new
1345         * clients
1346         *
1347         * @org.apache.xbean.Property
1348         *                            nestedType="org.apache.activemq.broker.TransportConnector"
1349         */
1350        public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1351            for (TransportConnector connector : transportConnectors) {
1352                addConnector(connector);
1353            }
1354        }
1355    
1356        public TransportConnector getTransportConnectorByName(String name){
1357            for (TransportConnector transportConnector : transportConnectors){
1358               if (name.equals(transportConnector.getName())){
1359                   return transportConnector;
1360               }
1361            }
1362            return null;
1363        }
1364    
1365        public TransportConnector getTransportConnectorByScheme(String scheme){
1366            for (TransportConnector transportConnector : transportConnectors){
1367                if (scheme.equals(transportConnector.getUri().getScheme())){
1368                    return transportConnector;
1369                }
1370            }
1371            return null;
1372        }
1373    
1374        public List<NetworkConnector> getNetworkConnectors() {
1375            return new ArrayList<NetworkConnector>(networkConnectors);
1376        }
1377    
1378        public List<ProxyConnector> getProxyConnectors() {
1379            return new ArrayList<ProxyConnector>(proxyConnectors);
1380        }
1381    
1382        /**
1383         * Sets the network connectors which this broker will use to connect to
1384         * other brokers in a federated network
1385         *
1386         * @org.apache.xbean.Property
1387         *                            nestedType="org.apache.activemq.network.NetworkConnector"
1388         */
1389        public void setNetworkConnectors(List<?> networkConnectors) throws Exception {
1390            for (Object connector : networkConnectors) {
1391                addNetworkConnector((NetworkConnector) connector);
1392            }
1393        }
1394    
1395        /**
1396         * Sets the network connectors which this broker will use to connect to
1397         * other brokers in a federated network
1398         */
1399        public void setProxyConnectors(List<?> proxyConnectors) throws Exception {
1400            for (Object connector : proxyConnectors) {
1401                addProxyConnector((ProxyConnector) connector);
1402            }
1403        }
1404    
1405        public PolicyMap getDestinationPolicy() {
1406            return destinationPolicy;
1407        }
1408    
1409        /**
1410         * Sets the destination specific policies available either for exact
1411         * destinations or for wildcard areas of destinations.
1412         */
1413        public void setDestinationPolicy(PolicyMap policyMap) {
1414            this.destinationPolicy = policyMap;
1415        }
1416    
1417        public BrokerPlugin[] getPlugins() {
1418            return plugins;
1419        }
1420    
1421        /**
1422         * Sets a number of broker plugins to install such as for security
1423         * authentication or authorization
1424         */
1425        public void setPlugins(BrokerPlugin[] plugins) {
1426            this.plugins = plugins;
1427        }
1428    
1429        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1430            return messageAuthorizationPolicy;
1431        }
1432    
1433        /**
1434         * Sets the policy used to decide if the current connection is authorized to
1435         * consume a given message
1436         */
1437        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1438            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1439        }
1440    
1441        /**
1442         * Delete all messages from the persistent store
1443         *
1444         * @throws IOException
1445         */
1446        public void deleteAllMessages() throws IOException {
1447            getPersistenceAdapter().deleteAllMessages();
1448        }
1449    
1450        public boolean isDeleteAllMessagesOnStartup() {
1451            return deleteAllMessagesOnStartup;
1452        }
1453    
1454        /**
1455         * Sets whether or not all messages are deleted on startup - mostly only
1456         * useful for testing.
1457         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1458         */
1459        public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1460            this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1461        }
1462    
1463        public URI getVmConnectorURI() {
1464            if (vmConnectorURI == null) {
1465                try {
1466                    vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
1467                } catch (URISyntaxException e) {
1468                    LOG.error("Badly formed URI from " + getBrokerName(), e);
1469                }
1470            }
1471            return vmConnectorURI;
1472        }
1473    
1474        public void setVmConnectorURI(URI vmConnectorURI) {
1475            this.vmConnectorURI = vmConnectorURI;
1476        }
1477    
1478        public String getDefaultSocketURIString() {
1479            if (started.get()) {
1480                if (this.defaultSocketURIString == null) {
1481                    for (TransportConnector tc:this.transportConnectors) {
1482                        String result = null;
1483                        try {
1484                            result = tc.getPublishableConnectString();
1485                        } catch (Exception e) {
1486                          LOG.warn("Failed to get the ConnectURI for "+tc,e);
1487                        }
1488                        if (result != null) {
1489                            // find first publishable uri
1490                            if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
1491                                this.defaultSocketURIString = result;
1492                                break;
1493                            } else {
1494                            // or use the first defined
1495                                if (this.defaultSocketURIString == null) {
1496                                    this.defaultSocketURIString = result;
1497                                }
1498                            }
1499                        }
1500                    }
1501    
1502                }
1503                return this.defaultSocketURIString;
1504            }
1505           return null;
1506        }
1507    
1508        /**
1509         * @return Returns the shutdownOnMasterFailure.
1510         */
1511        public boolean isShutdownOnMasterFailure() {
1512            return shutdownOnMasterFailure;
1513        }
1514    
1515        /**
1516         * @param shutdownOnMasterFailure
1517         *            The shutdownOnMasterFailure to set.
1518         */
1519        public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1520            this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1521        }
1522    
1523        public boolean isKeepDurableSubsActive() {
1524            return keepDurableSubsActive;
1525        }
1526    
1527        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1528            this.keepDurableSubsActive = keepDurableSubsActive;
1529        }
1530    
1531        public boolean isUseVirtualTopics() {
1532            return useVirtualTopics;
1533        }
1534    
1535        /**
1536         * Sets whether or not <a
1537         * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1538         * Topics</a> should be supported by default if they have not been
1539         * explicitly configured.
1540         */
1541        public void setUseVirtualTopics(boolean useVirtualTopics) {
1542            this.useVirtualTopics = useVirtualTopics;
1543        }
1544    
1545        public DestinationInterceptor[] getDestinationInterceptors() {
1546            return destinationInterceptors;
1547        }
1548    
1549        public boolean isUseMirroredQueues() {
1550            return useMirroredQueues;
1551        }
1552    
1553        /**
1554         * Sets whether or not <a
1555         * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1556         * Queues</a> should be supported by default if they have not been
1557         * explicitly configured.
1558         */
1559        public void setUseMirroredQueues(boolean useMirroredQueues) {
1560            this.useMirroredQueues = useMirroredQueues;
1561        }
1562    
1563        /**
1564         * Sets the destination interceptors to use
1565         */
1566        public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1567            this.destinationInterceptors = destinationInterceptors;
1568        }
1569    
1570        public ActiveMQDestination[] getDestinations() {
1571            return destinations;
1572        }
1573    
1574        /**
1575         * Sets the destinations which should be loaded/created on startup
1576         */
1577        public void setDestinations(ActiveMQDestination[] destinations) {
1578            this.destinations = destinations;
1579        }
1580    
1581        /**
1582         * @return the tempDataStore
1583         */
1584        public synchronized PListStore getTempDataStore() {
1585            if (tempDataStore == null) {
1586                if (!isPersistent()) {
1587                    return null;
1588                }
1589    
1590                try {
1591                    PersistenceAdapter pa = getPersistenceAdapter();
1592                    if( pa!=null && pa instanceof PListStore) {
1593                        return (PListStore) pa;
1594                    }
1595                } catch (IOException e) {
1596                    throw new RuntimeException(e);
1597                }
1598    
1599                boolean result = true;
1600                boolean empty = true;
1601                try {
1602                    File directory = getTmpDataDirectory();
1603                    if (directory.exists() && directory.isDirectory()) {
1604                        File[] files = directory.listFiles();
1605                        if (files != null && files.length > 0) {
1606                            empty = false;
1607                            for (int i = 0; i < files.length; i++) {
1608                                File file = files[i];
1609                                if (!file.isDirectory()) {
1610                                    result &= file.delete();
1611                                }
1612                            }
1613                        }
1614                    }
1615                    if (!empty) {
1616                        String str = result ? "Successfully deleted" : "Failed to delete";
1617                        LOG.info(str + " temporary storage");
1618                    }
1619    
1620                    String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl";
1621                    this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance();
1622                    this.tempDataStore.setDirectory(getTmpDataDirectory());
1623                    configureService(tempDataStore);
1624                    this.tempDataStore.start();
1625                } catch (Exception e) {
1626                    throw new RuntimeException(e);
1627                }
1628            }
1629            return tempDataStore;
1630        }
1631    
1632        /**
1633         * @param tempDataStore
1634         *            the tempDataStore to set
1635         */
1636        public void setTempDataStore(PListStore tempDataStore) {
1637            this.tempDataStore = tempDataStore;
1638            configureService(tempDataStore);
1639            try {
1640                tempDataStore.start();
1641            } catch (Exception e) {
1642                RuntimeException exception = new RuntimeException("Failed to start provided temp data store: " + tempDataStore, e);
1643                LOG.error(exception.getLocalizedMessage(), e);
1644                throw exception;
1645            }
1646        }
1647    
1648        public int getPersistenceThreadPriority() {
1649            return persistenceThreadPriority;
1650        }
1651    
1652        public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1653            this.persistenceThreadPriority = persistenceThreadPriority;
1654        }
1655    
1656        /**
1657         * @return the useLocalHostBrokerName
1658         */
1659        public boolean isUseLocalHostBrokerName() {
1660            return this.useLocalHostBrokerName;
1661        }
1662    
1663        /**
1664         * @param useLocalHostBrokerName
1665         *            the useLocalHostBrokerName to set
1666         */
1667        public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1668            this.useLocalHostBrokerName = useLocalHostBrokerName;
1669            if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1670                brokerName = LOCAL_HOST_NAME;
1671            }
1672        }
1673    
1674        /**
1675         * @return the supportFailOver
1676         */
1677        public boolean isSupportFailOver() {
1678            return this.supportFailOver;
1679        }
1680    
1681        /**
1682         * @param supportFailOver
1683         *            the supportFailOver to set
1684         */
1685        public void setSupportFailOver(boolean supportFailOver) {
1686            this.supportFailOver = supportFailOver;
1687        }
1688    
1689        /**
1690         * Looks up and lazily creates if necessary the destination for the given
1691         * JMS name
1692         */
1693        public Destination getDestination(ActiveMQDestination destination) throws Exception {
1694            return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1695        }
1696    
1697        public void removeDestination(ActiveMQDestination destination) throws Exception {
1698            getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1699        }
1700    
1701        public int getProducerSystemUsagePortion() {
1702            return producerSystemUsagePortion;
1703        }
1704    
1705        public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1706            this.producerSystemUsagePortion = producerSystemUsagePortion;
1707        }
1708    
1709        public int getConsumerSystemUsagePortion() {
1710            return consumerSystemUsagePortion;
1711        }
1712    
1713        public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1714            this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1715        }
1716    
1717        public boolean isSplitSystemUsageForProducersConsumers() {
1718            return splitSystemUsageForProducersConsumers;
1719        }
1720    
1721        public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1722            this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1723        }
1724    
1725        public boolean isMonitorConnectionSplits() {
1726            return monitorConnectionSplits;
1727        }
1728    
1729        public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1730            this.monitorConnectionSplits = monitorConnectionSplits;
1731        }
1732    
1733        public int getTaskRunnerPriority() {
1734            return taskRunnerPriority;
1735        }
1736    
1737        public void setTaskRunnerPriority(int taskRunnerPriority) {
1738            this.taskRunnerPriority = taskRunnerPriority;
1739        }
1740    
1741        public boolean isDedicatedTaskRunner() {
1742            return dedicatedTaskRunner;
1743        }
1744    
1745        public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1746            this.dedicatedTaskRunner = dedicatedTaskRunner;
1747        }
1748    
1749        public boolean isCacheTempDestinations() {
1750            return cacheTempDestinations;
1751        }
1752    
1753        public void setCacheTempDestinations(boolean cacheTempDestinations) {
1754            this.cacheTempDestinations = cacheTempDestinations;
1755        }
1756    
1757        public int getTimeBeforePurgeTempDestinations() {
1758            return timeBeforePurgeTempDestinations;
1759        }
1760    
1761        public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1762            this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1763        }
1764    
1765        public boolean isUseTempMirroredQueues() {
1766            return useTempMirroredQueues;
1767        }
1768    
1769        public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1770            this.useTempMirroredQueues = useTempMirroredQueues;
1771        }
1772    
1773        public synchronized JobSchedulerStore getJobSchedulerStore() {
1774            if (jobSchedulerStore == null && isSchedulerSupport()) {
1775                try {
1776                    String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
1777                    jobSchedulerStore = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
1778                    jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
1779                    configureService(jobSchedulerStore);
1780                    jobSchedulerStore.start();
1781                    LOG.info("JobScheduler using directory: " + getSchedulerDirectoryFile());
1782                } catch (Exception e) {
1783                    throw new RuntimeException(e);
1784                }
1785    
1786            }
1787            return jobSchedulerStore;
1788        }
1789    
1790        public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) {
1791            this.jobSchedulerStore = jobSchedulerStore;
1792            configureService(jobSchedulerStore);
1793            try {
1794                jobSchedulerStore.start();
1795            } catch (Exception e) {
1796                RuntimeException exception = new RuntimeException(
1797                        "Failed to start provided job scheduler store: " + jobSchedulerStore, e);
1798                LOG.error(exception.getLocalizedMessage(), e);
1799                throw exception;
1800            }
1801        }
1802    
1803        //
1804        // Implementation methods
1805        // -------------------------------------------------------------------------
1806        /**
1807         * Handles any lazy-creation helper properties which are added to make
1808         * things easier to configure inside environments such as Spring
1809         *
1810         * @throws Exception
1811         */
1812        protected void processHelperProperties() throws Exception {
1813            if (transportConnectorURIs != null) {
1814                for (int i = 0; i < transportConnectorURIs.length; i++) {
1815                    String uri = transportConnectorURIs[i];
1816                    addConnector(uri);
1817                }
1818            }
1819            if (networkConnectorURIs != null) {
1820                for (int i = 0; i < networkConnectorURIs.length; i++) {
1821                    String uri = networkConnectorURIs[i];
1822                    addNetworkConnector(uri);
1823                }
1824            }
1825            if (jmsBridgeConnectors != null) {
1826                for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1827                    addJmsConnector(jmsBridgeConnectors[i]);
1828                }
1829            }
1830        }
1831    
1832        protected void checkSystemUsageLimits() throws IOException {
1833            SystemUsage usage = getSystemUsage();
1834            long memLimit = usage.getMemoryUsage().getLimit();
1835            long jvmLimit = Runtime.getRuntime().maxMemory();
1836    
1837            if (memLimit > jvmLimit) {
1838                LOG.error("Memory Usage for the Broker (" + memLimit / (1024 * 1024) +
1839                          " mb) is more than the maximum available for the JVM: " +
1840                          jvmLimit / (1024 * 1024) + " mb");
1841            }
1842    
1843            if (getPersistenceAdapter() != null) {
1844                PersistenceAdapter adapter = getPersistenceAdapter();
1845                File dir = adapter.getDirectory();
1846    
1847                if (dir != null) {
1848                    String dirPath = dir.getAbsolutePath();
1849                    if (!dir.isAbsolute()) {
1850                        dir = new File(dirPath);
1851                    }
1852    
1853                    while (dir != null && dir.isDirectory() == false) {
1854                        dir = dir.getParentFile();
1855                    }
1856                    long storeLimit = usage.getStoreUsage().getLimit();
1857                    long dirFreeSpace = dir.getUsableSpace();
1858                    if (storeLimit > dirFreeSpace) {
1859                        LOG.warn("Store limit is " + storeLimit / (1024 * 1024) +
1860                                 " mb, whilst the data directory: " + dir.getAbsolutePath() +
1861                                 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
1862                    }
1863                }
1864    
1865                long maxJournalFileSize = 0;
1866                long storeLimit = usage.getStoreUsage().getLimit();
1867    
1868                if (adapter instanceof JournaledStore) {
1869                    maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength();
1870                }
1871    
1872                if (storeLimit < maxJournalFileSize) {
1873                    LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
1874                              " mb, whilst the max journal file size for the store is: " +
1875                              maxJournalFileSize / (1024 * 1024) + " mb, " +
1876                              "the store will not accept any data when used.");
1877                }
1878            }
1879    
1880            File tmpDir = getTmpDataDirectory();
1881            if (tmpDir != null) {
1882    
1883                String tmpDirPath = tmpDir.getAbsolutePath();
1884                if (!tmpDir.isAbsolute()) {
1885                    tmpDir = new File(tmpDirPath);
1886                }
1887    
1888                long storeLimit = usage.getTempUsage().getLimit();
1889                while (tmpDir != null && tmpDir.isDirectory() == false) {
1890                    tmpDir = tmpDir.getParentFile();
1891                }
1892                long dirFreeSpace = tmpDir.getUsableSpace();
1893                if (storeLimit > dirFreeSpace) {
1894                    LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
1895                              " mb, whilst the temporary data directory: " + tmpDirPath +
1896                              " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
1897                }
1898    
1899                if (isPersistent()) {
1900                    long maxJournalFileSize;
1901    
1902                    PListStore store = usage.getTempUsage().getStore();
1903                    if (store != null && store instanceof JournaledStore) {
1904                        maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength();
1905                    } else {
1906                        maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH;
1907                    }
1908    
1909                    if (storeLimit < maxJournalFileSize) {
1910                        LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
1911                                  " mb, whilst the max journal file size for the temporary store is: " +
1912                                  maxJournalFileSize / (1024 * 1024) + " mb, " +
1913                                  "the temp store will not accept any data when used.");
1914                    }
1915                }
1916            }
1917    
1918            if (getJobSchedulerStore() != null) {
1919                JobSchedulerStore scheduler = getJobSchedulerStore();
1920                File schedulerDir = scheduler.getDirectory();
1921                if (schedulerDir != null) {
1922    
1923                    String schedulerDirPath = schedulerDir.getAbsolutePath();
1924                    if (!schedulerDir.isAbsolute()) {
1925                        schedulerDir = new File(schedulerDirPath);
1926                    }
1927    
1928                    while (schedulerDir != null && schedulerDir.isDirectory() == false) {
1929                        schedulerDir = schedulerDir.getParentFile();
1930                    }
1931                    long schedularLimit = usage.getJobSchedulerUsage().getLimit();
1932                    long dirFreeSpace = schedulerDir.getUsableSpace();
1933                    if (schedularLimit > dirFreeSpace) {
1934                        LOG.warn("Job Schedular Store limit is " + schedularLimit / (1024 * 1024) +
1935                                 " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() +
1936                                 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
1937                    }
1938                }
1939            }
1940        }
1941    
1942        public void stopAllConnectors(ServiceStopper stopper) {
1943            for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1944                NetworkConnector connector = iter.next();
1945                unregisterNetworkConnectorMBean(connector);
1946                stopper.stop(connector);
1947            }
1948            for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1949                ProxyConnector connector = iter.next();
1950                stopper.stop(connector);
1951            }
1952            for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1953                JmsConnector connector = iter.next();
1954                stopper.stop(connector);
1955            }
1956            for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1957                TransportConnector connector = iter.next();
1958                stopper.stop(connector);
1959            }
1960        }
1961    
1962        protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
1963            try {
1964                ObjectName objectName = createConnectorObjectName(connector);
1965                connector = connector.asManagedConnector(getManagementContext(), objectName);
1966                ConnectorViewMBean view = new ConnectorView(connector);
1967                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1968                return connector;
1969            } catch (Throwable e) {
1970                throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1971            }
1972        }
1973    
1974        protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
1975            if (isUseJmx()) {
1976                try {
1977                    ObjectName objectName = createConnectorObjectName(connector);
1978                    getManagementContext().unregisterMBean(objectName);
1979                } catch (Throwable e) {
1980                    throw IOExceptionSupport.create(
1981                            "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
1982                }
1983            }
1984        }
1985    
1986        protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1987            return adaptor;
1988        }
1989    
1990        protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1991            if (isUseJmx()) {}
1992        }
1993    
1994        private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
1995            return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName());
1996        }
1997    
1998        protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
1999            NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
2000            try {
2001                ObjectName objectName = createNetworkConnectorObjectName(connector);
2002                connector.setObjectName(objectName);
2003                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2004            } catch (Throwable e) {
2005                throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
2006            }
2007        }
2008    
2009        protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
2010            return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
2011        }
2012    
2013        public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException {
2014            return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport.toString());
2015        }
2016    
2017        protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
2018            if (isUseJmx()) {
2019                try {
2020                    ObjectName objectName = createNetworkConnectorObjectName(connector);
2021                    getManagementContext().unregisterMBean(objectName);
2022                } catch (Exception e) {
2023                    LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
2024                }
2025            }
2026        }
2027    
2028        protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
2029            ProxyConnectorView view = new ProxyConnectorView(connector);
2030            try {
2031                ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName());
2032                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2033            } catch (Throwable e) {
2034                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2035            }
2036        }
2037    
2038        protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
2039            JmsConnectorView view = new JmsConnectorView(connector);
2040            try {
2041                ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName());
2042                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2043            } catch (Throwable e) {
2044                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2045            }
2046        }
2047    
2048        /**
2049         * Factory method to create a new broker
2050         *
2051         * @throws Exception
2052         * @throws
2053         * @throws
2054         */
2055        protected Broker createBroker() throws Exception {
2056            regionBroker = createRegionBroker();
2057            Broker broker = addInterceptors(regionBroker);
2058            // Add a filter that will stop access to the broker once stopped
2059            broker = new MutableBrokerFilter(broker) {
2060                Broker old;
2061    
2062                @Override
2063                public void stop() throws Exception {
2064                    old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
2065                        // Just ignore additional stop actions.
2066                        @Override
2067                        public void stop() throws Exception {
2068                        }
2069                    });
2070                    old.stop();
2071                }
2072    
2073                @Override
2074                public void start() throws Exception {
2075                    if (forceStart && old != null) {
2076                        this.next.set(old);
2077                    }
2078                    getNext().start();
2079                }
2080            };
2081            return broker;
2082        }
2083    
2084        /**
2085         * Factory method to create the core region broker onto which interceptors
2086         * are added
2087         *
2088         * @throws Exception
2089         */
2090        protected Broker createRegionBroker() throws Exception {
2091            if (destinationInterceptors == null) {
2092                destinationInterceptors = createDefaultDestinationInterceptor();
2093            }
2094            configureServices(destinationInterceptors);
2095            DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
2096            if (destinationFactory == null) {
2097                destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
2098            }
2099            return createRegionBroker(destinationInterceptor);
2100        }
2101    
2102        protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
2103            RegionBroker regionBroker;
2104            if (isUseJmx()) {
2105                try {
2106                    regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
2107                        getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
2108                } catch(MalformedObjectNameException me){
2109                    LOG.error("Couldn't create ManagedRegionBroker",me);
2110                    throw new IOException(me);
2111                }
2112            } else {
2113                regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
2114                        destinationInterceptor,getScheduler(),getExecutor());
2115            }
2116            destinationFactory.setRegionBroker(regionBroker);
2117            regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
2118            regionBroker.setBrokerName(getBrokerName());
2119            regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
2120            regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend());
2121            if (brokerId != null) {
2122                regionBroker.setBrokerId(brokerId);
2123            }
2124            return regionBroker;
2125        }
2126    
2127        /**
2128         * Create the default destination interceptor
2129         */
2130        protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
2131            List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
2132            if (isUseVirtualTopics()) {
2133                VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
2134                VirtualTopic virtualTopic = new VirtualTopic();
2135                virtualTopic.setName("VirtualTopic.>");
2136                VirtualDestination[] virtualDestinations = { virtualTopic };
2137                interceptor.setVirtualDestinations(virtualDestinations);
2138                answer.add(interceptor);
2139            }
2140            if (isUseMirroredQueues()) {
2141                MirroredQueue interceptor = new MirroredQueue();
2142                answer.add(interceptor);
2143            }
2144            DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
2145            answer.toArray(array);
2146            return array;
2147        }
2148    
2149        /**
2150         * Strategy method to add interceptors to the broker
2151         *
2152         * @throws IOException
2153         */
2154        protected Broker addInterceptors(Broker broker) throws Exception {
2155            if (isSchedulerSupport()) {
2156                SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
2157                if (isUseJmx()) {
2158                    JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
2159                    try {
2160                        ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName());
2161                        AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2162                        this.adminView.setJMSJobScheduler(objectName);
2163                    } catch (Throwable e) {
2164                        throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
2165                                + e.getMessage(), e);
2166                    }
2167                }
2168                broker = sb;
2169            }
2170            if (isUseJmx()) {
2171                HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker());
2172                try {
2173                    ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName());
2174                    AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName);
2175                } catch (Throwable e) {
2176                    throw IOExceptionSupport.create("Status MBean could not be registered in JMX: "
2177                            + e.getMessage(), e);
2178                }
2179            }
2180            if (isAdvisorySupport()) {
2181                broker = new AdvisoryBroker(broker);
2182            }
2183            broker = new CompositeDestinationBroker(broker);
2184            broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
2185            if (isPopulateJMSXUserID()) {
2186                UserIDBroker userIDBroker = new UserIDBroker(broker);
2187                userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
2188                broker = userIDBroker;
2189            }
2190            if (isMonitorConnectionSplits()) {
2191                broker = new ConnectionSplitBroker(broker);
2192            }
2193            if (plugins != null) {
2194                for (int i = 0; i < plugins.length; i++) {
2195                    BrokerPlugin plugin = plugins[i];
2196                    broker = plugin.installPlugin(broker);
2197                }
2198            }
2199            return broker;
2200        }
2201    
2202        protected PersistenceAdapter createPersistenceAdapter() throws IOException {
2203            if (isPersistent()) {
2204                PersistenceAdapterFactory fac = getPersistenceFactory();
2205                if (fac != null) {
2206                    return fac.createPersistenceAdapter();
2207                } else {
2208                    try {
2209                        String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
2210                        PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
2211                        File dir = new File(getBrokerDataDirectory(),"KahaDB");
2212                        adaptor.setDirectory(dir);
2213                        return adaptor;
2214                    } catch (Throwable e) {
2215                        throw IOExceptionSupport.create(e);
2216                    }
2217                }
2218            } else {
2219                return new MemoryPersistenceAdapter();
2220            }
2221        }
2222    
2223        protected ObjectName createBrokerObjectName() throws MalformedObjectNameException  {
2224            return BrokerMBeanSupport.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName());
2225        }
2226    
2227        protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
2228            TransportServer transport = TransportFactorySupport.bind(this, brokerURI);
2229            return new TransportConnector(transport);
2230        }
2231    
2232        /**
2233         * Extracts the port from the options
2234         */
2235        protected Object getPort(Map<?,?> options) {
2236            Object port = options.get("port");
2237            if (port == null) {
2238                port = DEFAULT_PORT;
2239                LOG.warn("No port specified so defaulting to: " + port);
2240            }
2241            return port;
2242        }
2243    
2244        protected void addShutdownHook() {
2245            if (useShutdownHook) {
2246                shutdownHook = new Thread("ActiveMQ ShutdownHook") {
2247                    @Override
2248                    public void run() {
2249                        containerShutdown();
2250                    }
2251                };
2252                Runtime.getRuntime().addShutdownHook(shutdownHook);
2253            }
2254        }
2255    
2256        protected void removeShutdownHook() {
2257            if (shutdownHook != null) {
2258                try {
2259                    Runtime.getRuntime().removeShutdownHook(shutdownHook);
2260                } catch (Exception e) {
2261                    LOG.debug("Caught exception, must be shutting down: " + e);
2262                }
2263            }
2264        }
2265    
2266        /**
2267         * Sets hooks to be executed when broker shut down
2268         *
2269         * @org.apache.xbean.Property
2270         */
2271        public void setShutdownHooks(List<Runnable> hooks) throws Exception {
2272            for (Runnable hook : hooks) {
2273                addShutdownHook(hook);
2274            }
2275        }
2276    
2277        /**
2278         * Causes a clean shutdown of the container when the VM is being shut down
2279         */
2280        protected void containerShutdown() {
2281            try {
2282                stop();
2283            } catch (IOException e) {
2284                Throwable linkedException = e.getCause();
2285                if (linkedException != null) {
2286                    logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2287                } else {
2288                    logError("Failed to shut down: " + e, e);
2289                }
2290                if (!useLoggingForShutdownErrors) {
2291                    e.printStackTrace(System.err);
2292                }
2293            } catch (Exception e) {
2294                logError("Failed to shut down: " + e, e);
2295            }
2296        }
2297    
2298        protected void logError(String message, Throwable e) {
2299            if (useLoggingForShutdownErrors) {
2300                LOG.error("Failed to shut down: " + e);
2301            } else {
2302                System.err.println("Failed to shut down: " + e);
2303            }
2304        }
2305    
2306        /**
2307         * Starts any configured destinations on startup
2308         */
2309        protected void startDestinations() throws Exception {
2310            if (destinations != null) {
2311                ConnectionContext adminConnectionContext = getAdminConnectionContext();
2312                for (int i = 0; i < destinations.length; i++) {
2313                    ActiveMQDestination destination = destinations[i];
2314                    getBroker().addDestination(adminConnectionContext, destination,true);
2315                }
2316            }
2317            if (isUseVirtualTopics()) {
2318                startVirtualConsumerDestinations();
2319            }
2320        }
2321    
2322        /**
2323         * Returns the broker's administration connection context used for
2324         * configuring the broker at startup
2325         */
2326        public ConnectionContext getAdminConnectionContext() throws Exception {
2327            return BrokerSupport.getConnectionContext(getBroker());
2328        }
2329    
2330        protected void startManagementContext() throws Exception {
2331            getManagementContext().setBrokerName(brokerName);
2332            getManagementContext().start();
2333            adminView = new BrokerView(this, null);
2334            ObjectName objectName = getBrokerObjectName();
2335            AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2336        }
2337    
2338        /**
2339         * Start all transport and network connections, proxies and bridges
2340         *
2341         * @throws Exception
2342         */
2343        public void startAllConnectors() throws Exception {
2344            Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2345            List<TransportConnector> al = new ArrayList<TransportConnector>();
2346            for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2347                TransportConnector connector = iter.next();
2348                connector.setBrokerService(this);
2349                al.add(startTransportConnector(connector));
2350            }
2351            if (al.size() > 0) {
2352                // let's clear the transportConnectors list and replace it with
2353                // the started transportConnector instances
2354                this.transportConnectors.clear();
2355                setTransportConnectors(al);
2356            }
2357            URI uri = getVmConnectorURI();
2358            Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
2359            map.put("network", "true");
2360            map.put("async", "false");
2361            uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
2362    
2363            if (!stopped.get()) {
2364                ThreadPoolExecutor networkConnectorStartExecutor = null;
2365                if (isNetworkConnectorStartAsync()) {
2366                    // spin up as many threads as needed
2367                    networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2368                        10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2369                        new ThreadFactory() {
2370                            int count=0;
2371                            @Override
2372                            public Thread newThread(Runnable runnable) {
2373                                Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2374                                thread.setDaemon(true);
2375                                return thread;
2376                            }
2377                        });
2378                }
2379    
2380                for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2381                    final NetworkConnector connector = iter.next();
2382                    connector.setLocalUri(uri);
2383                    connector.setBrokerName(getBrokerName());
2384                    connector.setDurableDestinations(durableDestinations);
2385                    if (getDefaultSocketURIString() != null) {
2386                        connector.setBrokerURL(getDefaultSocketURIString());
2387                    }
2388                    if (networkConnectorStartExecutor != null) {
2389                        networkConnectorStartExecutor.execute(new Runnable() {
2390                            @Override
2391                            public void run() {
2392                                try {
2393                                    LOG.info("Async start of " + connector);
2394                                    connector.start();
2395                                } catch(Exception e) {
2396                                    LOG.error("Async start of network connector: " + connector + " failed", e);
2397                                }
2398                            }
2399                        });
2400                    } else {
2401                        connector.start();
2402                    }
2403                }
2404                if (networkConnectorStartExecutor != null) {
2405                    // executor done when enqueued tasks are complete
2406                    ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
2407                }
2408    
2409                for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2410                    ProxyConnector connector = iter.next();
2411                    connector.start();
2412                }
2413                for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2414                    JmsConnector connector = iter.next();
2415                    connector.start();
2416                }
2417                for (Service service : services) {
2418                    configureService(service);
2419                    service.start();
2420                }
2421            }
2422        }
2423    
2424        protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2425            connector.setTaskRunnerFactory(getTaskRunnerFactory());
2426            MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2427            if (policy != null) {
2428                connector.setMessageAuthorizationPolicy(policy);
2429            }
2430            if (isUseJmx()) {
2431                connector = registerConnectorMBean(connector);
2432            }
2433            connector.getStatistics().setEnabled(enableStatistics);
2434            connector.start();
2435            return connector;
2436        }
2437    
2438        /**
2439         * Perform any custom dependency injection
2440         */
2441        protected void configureServices(Object[] services) {
2442            for (Object service : services) {
2443                configureService(service);
2444            }
2445        }
2446    
2447        /**
2448         * Perform any custom dependency injection
2449         */
2450        protected void configureService(Object service) {
2451            if (service instanceof BrokerServiceAware) {
2452                BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2453                serviceAware.setBrokerService(this);
2454            }
2455        }
2456    
2457        public void handleIOException(IOException exception) {
2458            if (ioExceptionHandler != null) {
2459                ioExceptionHandler.handle(exception);
2460             } else {
2461                LOG.info("No IOExceptionHandler registered, ignoring IO exception, " + exception, exception);
2462             }
2463        }
2464    
2465        protected void startVirtualConsumerDestinations() throws Exception {
2466            ConnectionContext adminConnectionContext = getAdminConnectionContext();
2467            Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
2468            DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
2469            if (!destinations.isEmpty()) {
2470                for (ActiveMQDestination destination : destinations) {
2471                    if (filter.matches(destination) == true) {
2472                        broker.addDestination(adminConnectionContext, destination, false);
2473                    }
2474                }
2475            }
2476        }
2477    
2478        private DestinationFilter getVirtualTopicConsumerDestinationFilter() {
2479            // created at startup, so no sync needed
2480            if (virtualConsumerDestinationFilter == null) {
2481                Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>();
2482                if (destinationInterceptors != null) {
2483                    for (DestinationInterceptor interceptor : destinationInterceptors) {
2484                        if (interceptor instanceof VirtualDestinationInterceptor) {
2485                            VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor;
2486                            for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) {
2487                                if (virtualDestination instanceof VirtualTopic) {
2488                                    consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
2489                                }
2490                            }
2491                        }
2492                    }
2493                }
2494                ActiveMQQueue filter = new ActiveMQQueue();
2495                filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{}));
2496                virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter);
2497            }
2498            return virtualConsumerDestinationFilter;
2499        }
2500    
2501        protected synchronized ThreadPoolExecutor getExecutor() {
2502            if (this.executor == null) {
2503                this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2504    
2505                    private long i = 0;
2506    
2507                    @Override
2508                    public Thread newThread(Runnable runnable) {
2509                        this.i++;
2510                        Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i);
2511                        thread.setDaemon(true);
2512                        thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
2513                            @Override
2514                            public void uncaughtException(final Thread t, final Throwable e) {
2515                                LOG.error("Error in thread '{}'", t.getName(), e);
2516                            }
2517                        });
2518                        return thread;
2519                    }
2520                }, new RejectedExecutionHandler() {
2521                    @Override
2522                    public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
2523                        try {
2524                            executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
2525                        } catch (InterruptedException e) {
2526                            throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
2527                        }
2528    
2529                        throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
2530                    }
2531                });
2532            }
2533            return this.executor;
2534        }
2535    
2536        public synchronized Scheduler getScheduler() {
2537            if (this.scheduler==null) {
2538                this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2539                try {
2540                    this.scheduler.start();
2541                } catch (Exception e) {
2542                   LOG.error("Failed to start Scheduler ",e);
2543                }
2544            }
2545            return this.scheduler;
2546        }
2547    
2548        public Broker getRegionBroker() {
2549            return regionBroker;
2550        }
2551    
2552        public void setRegionBroker(Broker regionBroker) {
2553            this.regionBroker = regionBroker;
2554        }
2555    
2556        public void addShutdownHook(Runnable hook) {
2557            synchronized (shutdownHooks) {
2558                shutdownHooks.add(hook);
2559            }
2560        }
2561    
2562        public void removeShutdownHook(Runnable hook) {
2563            synchronized (shutdownHooks) {
2564                shutdownHooks.remove(hook);
2565            }
2566        }
2567    
2568        public boolean isSystemExitOnShutdown() {
2569            return systemExitOnShutdown;
2570        }
2571    
2572        /**
2573         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2574         */
2575        public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2576            this.systemExitOnShutdown = systemExitOnShutdown;
2577        }
2578    
2579        public int getSystemExitOnShutdownExitCode() {
2580            return systemExitOnShutdownExitCode;
2581        }
2582    
2583        public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2584            this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2585        }
2586    
2587        public SslContext getSslContext() {
2588            return sslContext;
2589        }
2590    
2591        public void setSslContext(SslContext sslContext) {
2592            this.sslContext = sslContext;
2593        }
2594    
2595        public boolean isShutdownOnSlaveFailure() {
2596            return shutdownOnSlaveFailure;
2597        }
2598    
2599        /**
2600         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2601         */
2602        public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2603            this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2604        }
2605    
2606        public boolean isWaitForSlave() {
2607            return waitForSlave;
2608        }
2609    
2610        /**
2611         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2612         */
2613        public void setWaitForSlave(boolean waitForSlave) {
2614            this.waitForSlave = waitForSlave;
2615        }
2616    
2617        public long getWaitForSlaveTimeout() {
2618            return this.waitForSlaveTimeout;
2619        }
2620    
2621        public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2622            this.waitForSlaveTimeout = waitForSlaveTimeout;
2623        }
2624    
2625        /**
2626         * Get the passiveSlave
2627         * @return the passiveSlave
2628         */
2629        public boolean isPassiveSlave() {
2630            return this.passiveSlave;
2631        }
2632    
2633        /**
2634         * Set the passiveSlave
2635         * @param passiveSlave the passiveSlave to set
2636         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2637         */
2638        public void setPassiveSlave(boolean passiveSlave) {
2639            this.passiveSlave = passiveSlave;
2640        }
2641    
2642        /**
2643         * override the Default IOException handler, called when persistence adapter
2644         * has experiences File or JDBC I/O Exceptions
2645         *
2646         * @param ioExceptionHandler
2647         */
2648        public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2649            configureService(ioExceptionHandler);
2650            this.ioExceptionHandler = ioExceptionHandler;
2651        }
2652    
2653        public IOExceptionHandler getIoExceptionHandler() {
2654            return ioExceptionHandler;
2655        }
2656    
2657        /**
2658         * @return the schedulerSupport
2659         */
2660        public boolean isSchedulerSupport() {
2661            return this.schedulerSupport;
2662        }
2663    
2664        /**
2665         * @param schedulerSupport the schedulerSupport to set
2666         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2667         */
2668        public void setSchedulerSupport(boolean schedulerSupport) {
2669            this.schedulerSupport = schedulerSupport;
2670        }
2671    
2672        /**
2673         * @return the schedulerDirectory
2674         */
2675        public File getSchedulerDirectoryFile() {
2676            if (this.schedulerDirectoryFile == null) {
2677                this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2678            }
2679            return schedulerDirectoryFile;
2680        }
2681    
2682        /**
2683         * @param schedulerDirectory the schedulerDirectory to set
2684         */
2685        public void setSchedulerDirectoryFile(File schedulerDirectory) {
2686            this.schedulerDirectoryFile = schedulerDirectory;
2687        }
2688    
2689        public void setSchedulerDirectory(String schedulerDirectory) {
2690            setSchedulerDirectoryFile(new File(schedulerDirectory));
2691        }
2692    
2693        public int getSchedulePeriodForDestinationPurge() {
2694            return this.schedulePeriodForDestinationPurge;
2695        }
2696    
2697        public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2698            this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2699        }
2700    
2701        public int getMaxPurgedDestinationsPerSweep() {
2702            return this.maxPurgedDestinationsPerSweep;
2703        }
2704    
2705        public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) {
2706            this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep;
2707        }
2708    
2709        public BrokerContext getBrokerContext() {
2710            return brokerContext;
2711        }
2712    
2713        public void setBrokerContext(BrokerContext brokerContext) {
2714            this.brokerContext = brokerContext;
2715        }
2716    
2717        public void setBrokerId(String brokerId) {
2718            this.brokerId = new BrokerId(brokerId);
2719        }
2720    
2721        public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
2722            return useAuthenticatedPrincipalForJMSXUserID;
2723        }
2724    
2725        public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
2726            this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
2727        }
2728    
2729        /**
2730         * Should MBeans that support showing the Authenticated User Name information have this
2731         * value filled in or not.
2732         *
2733         * @return true if user names should be exposed in MBeans
2734         */
2735        public boolean isPopulateUserNameInMBeans() {
2736            return this.populateUserNameInMBeans;
2737        }
2738    
2739        /**
2740         * Sets whether Authenticated User Name information is shown in MBeans that support this field.
2741         * @param value if MBeans should expose user name information.
2742         */
2743        public void setPopulateUserNameInMBeans(boolean value) {
2744            this.populateUserNameInMBeans = value;
2745        }
2746    
2747        /**
2748         * Gets the time in Milliseconds that an invocation of an MBean method will wait before
2749         * failing.  The default value is to wait forever (zero).
2750         *
2751         * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
2752         */
2753        public long getMbeanInvocationTimeout() {
2754            return mbeanInvocationTimeout;
2755        }
2756    
2757        /**
2758         * Gets the time in Milliseconds that an invocation of an MBean method will wait before
2759         * failing. The default value is to wait forever (zero).
2760         *
2761         * @param mbeanInvocationTimeout
2762         *      timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
2763         */
2764        public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) {
2765            this.mbeanInvocationTimeout = mbeanInvocationTimeout;
2766        }
2767    
2768        public boolean isNetworkConnectorStartAsync() {
2769            return networkConnectorStartAsync;
2770        }
2771    
2772        public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
2773            this.networkConnectorStartAsync = networkConnectorStartAsync;
2774        }
2775    
2776        public boolean isAllowTempAutoCreationOnSend() {
2777            return allowTempAutoCreationOnSend;
2778        }
2779    
2780        /**
2781         * enable if temp destinations need to be propagated through a network when
2782         * advisorySupport==false. This is used in conjunction with the policy
2783         * gcInactiveDestinations for matching temps so they can get removed
2784         * when inactive
2785         *
2786         * @param allowTempAutoCreationOnSend
2787         */
2788        public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
2789            this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
2790        }
2791    
2792        public long getOfflineDurableSubscriberTimeout() {
2793            return offlineDurableSubscriberTimeout;
2794        }
2795    
2796        public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) {
2797            this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
2798        }
2799    
2800        public long getOfflineDurableSubscriberTaskSchedule() {
2801            return offlineDurableSubscriberTaskSchedule;
2802        }
2803    
2804        public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) {
2805            this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
2806        }
2807    
2808        public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
2809            return isUseVirtualTopics() && destination.isQueue() &&
2810                   getVirtualTopicConsumerDestinationFilter().matches(destination);
2811        }
2812    
2813        public Throwable getStartException() {
2814            return startException;
2815        }
2816    
2817        public boolean isStartAsync() {
2818            return startAsync;
2819        }
2820    
2821        public void setStartAsync(boolean startAsync) {
2822            this.startAsync = startAsync;
2823        }
2824    }