001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker;
018    
019    import java.io.BufferedReader;
020    import java.io.File;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.io.InputStreamReader;
024    import java.net.URI;
025    import java.net.URISyntaxException;
026    import java.net.UnknownHostException;
027    import java.util.ArrayList;
028    import java.util.Date;
029    import java.util.HashMap;
030    import java.util.HashSet;
031    import java.util.Iterator;
032    import java.util.List;
033    import java.util.Locale;
034    import java.util.Map;
035    import java.util.Set;
036    import java.util.concurrent.CopyOnWriteArrayList;
037    import java.util.concurrent.CountDownLatch;
038    import java.util.concurrent.LinkedBlockingQueue;
039    import java.util.concurrent.RejectedExecutionException;
040    import java.util.concurrent.RejectedExecutionHandler;
041    import java.util.concurrent.SynchronousQueue;
042    import java.util.concurrent.ThreadFactory;
043    import java.util.concurrent.ThreadPoolExecutor;
044    import java.util.concurrent.TimeUnit;
045    import java.util.concurrent.atomic.AtomicBoolean;
046    
047    import javax.annotation.PostConstruct;
048    import javax.annotation.PreDestroy;
049    import javax.management.MalformedObjectNameException;
050    import javax.management.ObjectName;
051    
052    import org.apache.activemq.ActiveMQConnectionMetaData;
053    import org.apache.activemq.ConfigurationException;
054    import org.apache.activemq.Service;
055    import org.apache.activemq.advisory.AdvisoryBroker;
056    import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
057    import org.apache.activemq.broker.ft.MasterConnector;
058    import org.apache.activemq.broker.jmx.AnnotatedMBean;
059    import org.apache.activemq.broker.jmx.BrokerView;
060    import org.apache.activemq.broker.jmx.ConnectorView;
061    import org.apache.activemq.broker.jmx.ConnectorViewMBean;
062    import org.apache.activemq.broker.jmx.FTConnectorView;
063    import org.apache.activemq.broker.jmx.JmsConnectorView;
064    import org.apache.activemq.broker.jmx.JobSchedulerView;
065    import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
066    import org.apache.activemq.broker.jmx.ManagedRegionBroker;
067    import org.apache.activemq.broker.jmx.ManagementContext;
068    import org.apache.activemq.broker.jmx.NetworkConnectorView;
069    import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
070    import org.apache.activemq.broker.jmx.ProxyConnectorView;
071    import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
072    import org.apache.activemq.broker.region.Destination;
073    import org.apache.activemq.broker.region.DestinationFactory;
074    import org.apache.activemq.broker.region.DestinationFactoryImpl;
075    import org.apache.activemq.broker.region.DestinationInterceptor;
076    import org.apache.activemq.broker.region.RegionBroker;
077    import org.apache.activemq.broker.region.policy.PolicyMap;
078    import org.apache.activemq.broker.region.virtual.MirroredQueue;
079    import org.apache.activemq.broker.region.virtual.VirtualDestination;
080    import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
081    import org.apache.activemq.broker.region.virtual.VirtualTopic;
082    import org.apache.activemq.broker.scheduler.SchedulerBroker;
083    import org.apache.activemq.command.ActiveMQDestination;
084    import org.apache.activemq.command.ActiveMQQueue;
085    import org.apache.activemq.command.BrokerId;
086    import org.apache.activemq.filter.DestinationFilter;
087    import org.apache.activemq.network.ConnectionFilter;
088    import org.apache.activemq.network.DiscoveryNetworkConnector;
089    import org.apache.activemq.network.NetworkConnector;
090    import org.apache.activemq.network.jms.JmsConnector;
091    import org.apache.activemq.proxy.ProxyConnector;
092    import org.apache.activemq.security.MessageAuthorizationPolicy;
093    import org.apache.activemq.selector.SelectorParser;
094    import org.apache.activemq.store.PersistenceAdapter;
095    import org.apache.activemq.store.PersistenceAdapterFactory;
096    import org.apache.activemq.store.amq.AMQPersistenceAdapter;
097    import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
098    import org.apache.activemq.store.kahadb.plist.PListStore;
099    import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
100    import org.apache.activemq.thread.Scheduler;
101    import org.apache.activemq.thread.TaskRunnerFactory;
102    import org.apache.activemq.transport.TransportFactory;
103    import org.apache.activemq.transport.TransportServer;
104    import org.apache.activemq.transport.stomp.ProtocolConverter;
105    import org.apache.activemq.transport.vm.VMTransportFactory;
106    import org.apache.activemq.usage.SystemUsage;
107    import org.apache.activemq.util.BrokerSupport;
108    import org.apache.activemq.util.DefaultIOExceptionHandler;
109    import org.apache.activemq.util.IOExceptionHandler;
110    import org.apache.activemq.util.IOExceptionSupport;
111    import org.apache.activemq.util.IOHelper;
112    import org.apache.activemq.util.InetAddressUtil;
113    import org.apache.activemq.util.JMXSupport;
114    import org.apache.activemq.util.ServiceStopper;
115    import org.apache.activemq.util.ThreadPoolUtils;
116    import org.apache.activemq.util.TimeUtils;
117    import org.apache.activemq.util.URISupport;
118    import org.slf4j.Logger;
119    import org.slf4j.LoggerFactory;
120    import org.slf4j.MDC;
121    
122    /**
123     * Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a
124     * number of transport connectors, network connectors and a bunch of properties
125     * which can be used to configure the broker as its lazily created.
126     *
127     *
128     * @org.apache.xbean.XBean
129     */
130    public class BrokerService implements Service {
131        protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
132        public static final String DEFAULT_PORT = "61616";
133        public static final String LOCAL_HOST_NAME;
134        public static final String BROKER_VERSION;
135        public static final String DEFAULT_BROKER_NAME = "localhost";
136        private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
137        private static final long serialVersionUID = 7353129142305630237L;
138        private boolean useJmx = true;
139        private boolean enableStatistics = true;
140        private boolean persistent = true;
141        private boolean populateJMSXUserID;
142        private boolean useAuthenticatedPrincipalForJMSXUserID;
143        private boolean populateUserNameInMBeans;
144        private long mbeanInvocationTimeout = 0;
145    
146        private boolean useShutdownHook = true;
147        private boolean useLoggingForShutdownErrors;
148        private boolean shutdownOnMasterFailure;
149        private boolean shutdownOnSlaveFailure;
150        private boolean waitForSlave;
151        private long waitForSlaveTimeout = 600000L;
152        private boolean passiveSlave;
153        private String brokerName = DEFAULT_BROKER_NAME;
154        private File dataDirectoryFile;
155        private File tmpDataDirectory;
156        private Broker broker;
157        private BrokerView adminView;
158        private ManagementContext managementContext;
159        private ObjectName brokerObjectName;
160        private TaskRunnerFactory taskRunnerFactory;
161        private TaskRunnerFactory persistenceTaskRunnerFactory;
162        private SystemUsage systemUsage;
163        private SystemUsage producerSystemUsage;
164        private SystemUsage consumerSystemUsaage;
165        private PersistenceAdapter persistenceAdapter;
166        private PersistenceAdapterFactory persistenceFactory;
167        protected DestinationFactory destinationFactory;
168        private MessageAuthorizationPolicy messageAuthorizationPolicy;
169        private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
170        private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
171        private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
172        private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
173        private final List<Service> services = new ArrayList<Service>();
174        private MasterConnector masterConnector;
175        private String masterConnectorURI;
176        private transient Thread shutdownHook;
177        private String[] transportConnectorURIs;
178        private String[] networkConnectorURIs;
179        private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
180        // to other jms messaging
181        // systems
182        private boolean deleteAllMessagesOnStartup;
183        private boolean advisorySupport = true;
184        private URI vmConnectorURI;
185        private String defaultSocketURIString;
186        private PolicyMap destinationPolicy;
187        private final AtomicBoolean started = new AtomicBoolean(false);
188        private final AtomicBoolean stopped = new AtomicBoolean(false);
189        private final AtomicBoolean stopping = new AtomicBoolean(false);
190        private BrokerPlugin[] plugins;
191        private boolean keepDurableSubsActive = true;
192        private boolean useVirtualTopics = true;
193        private boolean useMirroredQueues = false;
194        private boolean useTempMirroredQueues = true;
195        private BrokerId brokerId;
196        private DestinationInterceptor[] destinationInterceptors;
197        private ActiveMQDestination[] destinations;
198        private PListStore tempDataStore;
199        private int persistenceThreadPriority = Thread.MAX_PRIORITY;
200        private boolean useLocalHostBrokerName;
201        private final CountDownLatch stoppedLatch = new CountDownLatch(1);
202        private final CountDownLatch startedLatch = new CountDownLatch(1);
203        private boolean supportFailOver;
204        private Broker regionBroker;
205        private int producerSystemUsagePortion = 60;
206        private int consumerSystemUsagePortion = 40;
207        private boolean splitSystemUsageForProducersConsumers;
208        private boolean monitorConnectionSplits = false;
209        private int taskRunnerPriority = Thread.NORM_PRIORITY;
210        private boolean dedicatedTaskRunner;
211        private boolean cacheTempDestinations = false;// useful for failover
212        private int timeBeforePurgeTempDestinations = 5000;
213        private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
214        private boolean systemExitOnShutdown;
215        private int systemExitOnShutdownExitCode;
216        private SslContext sslContext;
217        private boolean forceStart = false;
218        private IOExceptionHandler ioExceptionHandler;
219        private boolean schedulerSupport = false;
220        private File schedulerDirectoryFile;
221        private Scheduler scheduler;
222        private ThreadPoolExecutor executor;
223        private boolean slave = true;
224        private int schedulePeriodForDestinationPurge= 0;
225        private int maxPurgedDestinationsPerSweep = 0;
226        private BrokerContext brokerContext;
227        private boolean networkConnectorStartAsync = false;
228        private boolean allowTempAutoCreationOnSend;
229    
230        private int offlineDurableSubscriberTimeout = -1;
231        private int offlineDurableSubscriberTaskSchedule = 300000;
232        private DestinationFilter virtualConsumerDestinationFilter;
233    
234        private final Object persistenceAdapterLock = new Object();
235        private Throwable startException = null;
236        private boolean startAsync = false;
237        private Date startDate;
238    
239        static {
240            String localHostName = "localhost";
241            try {
242                localHostName =  InetAddressUtil.getLocalHostName();
243            } catch (UnknownHostException e) {
244                LOG.error("Failed to resolve localhost");
245            }
246            LOCAL_HOST_NAME = localHostName;
247    
248            InputStream in = null;
249            String version = null;
250            if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
251                BufferedReader reader = new BufferedReader(new InputStreamReader(in));
252                try {
253                    version = reader.readLine();
254                } catch(Exception e) {
255                }
256            }
257            BROKER_VERSION = version;
258        }
259    
260        @Override
261        public String toString() {
262            return "BrokerService[" + getBrokerName() + "]";
263        }
264    
265        private String getBrokerVersion() {
266            String version = ActiveMQConnectionMetaData.PROVIDER_VERSION;
267            if (version == null) {
268                version = BROKER_VERSION;
269            }
270    
271            return version;
272        }
273    
274        /**
275         * Adds a new transport connector for the given bind address
276         *
277         * @return the newly created and added transport connector
278         * @throws Exception
279         */
280        public TransportConnector addConnector(String bindAddress) throws Exception {
281            return addConnector(new URI(bindAddress));
282        }
283    
284        /**
285         * Adds a new transport connector for the given bind address
286         *
287         * @return the newly created and added transport connector
288         * @throws Exception
289         */
290        public TransportConnector addConnector(URI bindAddress) throws Exception {
291            return addConnector(createTransportConnector(bindAddress));
292        }
293    
294        /**
295         * Adds a new transport connector for the given TransportServer transport
296         *
297         * @return the newly created and added transport connector
298         * @throws Exception
299         */
300        public TransportConnector addConnector(TransportServer transport) throws Exception {
301            return addConnector(new TransportConnector(transport));
302        }
303    
304        /**
305         * Adds a new transport connector
306         *
307         * @return the transport connector
308         * @throws Exception
309         */
310        public TransportConnector addConnector(TransportConnector connector) throws Exception {
311            transportConnectors.add(connector);
312            return connector;
313        }
314    
315        /**
316         * Stops and removes a transport connector from the broker.
317         *
318         * @param connector
319         * @return true if the connector has been previously added to the broker
320         * @throws Exception
321         */
322        public boolean removeConnector(TransportConnector connector) throws Exception {
323            boolean rc = transportConnectors.remove(connector);
324            if (rc) {
325                unregisterConnectorMBean(connector);
326            }
327            return rc;
328        }
329    
330        /**
331         * Adds a new network connector using the given discovery address
332         *
333         * @return the newly created and added network connector
334         * @throws Exception
335         */
336        public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
337            return addNetworkConnector(new URI(discoveryAddress));
338        }
339    
340        /**
341         * Adds a new proxy connector using the given bind address
342         *
343         * @return the newly created and added network connector
344         * @throws Exception
345         */
346        public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
347            return addProxyConnector(new URI(bindAddress));
348        }
349    
350        /**
351         * Adds a new network connector using the given discovery address
352         *
353         * @return the newly created and added network connector
354         * @throws Exception
355         */
356        public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
357            NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
358            return addNetworkConnector(connector);
359        }
360    
361        /**
362         * Adds a new proxy connector using the given bind address
363         *
364         * @return the newly created and added network connector
365         * @throws Exception
366         */
367        public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
368            ProxyConnector connector = new ProxyConnector();
369            connector.setBind(bindAddress);
370            connector.setRemote(new URI("fanout:multicast://default"));
371            return addProxyConnector(connector);
372        }
373    
374        /**
375         * Adds a new network connector to connect this broker to a federated
376         * network
377         */
378        public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
379            connector.setBrokerService(this);
380            URI uri = getVmConnectorURI();
381            Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
382            map.put("network", "true");
383            uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
384            connector.setLocalUri(uri);
385            // Set a connection filter so that the connector does not establish loop
386            // back connections.
387            connector.setConnectionFilter(new ConnectionFilter() {
388                public boolean connectTo(URI location) {
389                    List<TransportConnector> transportConnectors = getTransportConnectors();
390                    for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
391                        try {
392                            TransportConnector tc = iter.next();
393                            if (location.equals(tc.getConnectUri())) {
394                                return false;
395                            }
396                        } catch (Throwable e) {
397                        }
398                    }
399                    return true;
400                }
401            });
402            networkConnectors.add(connector);
403            if (isUseJmx()) {
404                registerNetworkConnectorMBean(connector);
405            }
406            return connector;
407        }
408    
409        /**
410         * Removes the given network connector without stopping it. The caller
411         * should call {@link NetworkConnector#stop()} to close the connector
412         */
413        public boolean removeNetworkConnector(NetworkConnector connector) {
414            boolean answer = networkConnectors.remove(connector);
415            if (answer) {
416                unregisterNetworkConnectorMBean(connector);
417            }
418            return answer;
419        }
420    
421        public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
422            URI uri = getVmConnectorURI();
423            connector.setLocalUri(uri);
424            proxyConnectors.add(connector);
425            if (isUseJmx()) {
426                registerProxyConnectorMBean(connector);
427            }
428            return connector;
429        }
430    
431        public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
432            connector.setBrokerService(this);
433            jmsConnectors.add(connector);
434            if (isUseJmx()) {
435                registerJmsConnectorMBean(connector);
436            }
437            return connector;
438        }
439    
440        public JmsConnector removeJmsConnector(JmsConnector connector) {
441            if (jmsConnectors.remove(connector)) {
442                return connector;
443            }
444            return null;
445        }
446    
447        /**
448         * @return Returns the masterConnectorURI.
449         */
450        public String getMasterConnectorURI() {
451            return masterConnectorURI;
452        }
453    
454        /**
455         * @param masterConnectorURI
456         *            The masterConnectorURI to set.
457         */
458        public void setMasterConnectorURI(String masterConnectorURI) {
459            this.masterConnectorURI = masterConnectorURI;
460        }
461    
462        /**
463         * @return true if this Broker is a slave to a Master
464         */
465        public boolean isSlave() {
466            return (masterConnector != null && masterConnector.isSlave()) ||
467                (masterConnector != null && masterConnector.isStoppedBeforeStart()) ||
468                (masterConnector == null && slave);
469        }
470    
471        public void masterFailed() {
472            if (shutdownOnMasterFailure) {
473                LOG.error("The Master has failed ... shutting down");
474                try {
475                    stop();
476                } catch (Exception e) {
477                    LOG.error("Failed to stop for master failure", e);
478                }
479            } else {
480                LOG.warn("Master Failed - starting all connectors");
481                try {
482                    startAllConnectors();
483                    broker.nowMasterBroker();
484                } catch (Exception e) {
485                    LOG.error("Failed to startAllConnectors", e);
486                }
487            }
488        }
489    
490        public String getUptime() {
491            // compute and log uptime
492            if (startDate == null) {
493                return "not started";
494            }
495            long delta = new Date().getTime() - startDate.getTime();
496            return TimeUtils.printDuration(delta);
497        }
498    
499        public boolean isStarted() {
500            return started.get();
501        }
502    
503        /**
504         * Forces a start of the broker.
505         * By default a BrokerService instance that was
506         * previously stopped using BrokerService.stop() cannot be restarted
507         * using BrokerService.start().
508         * This method enforces a restart.
509         * It is not recommended to force a restart of the broker and will not work
510         * for most but some very trivial broker configurations.
511         * For restarting a broker instance we recommend to first call stop() on
512         * the old instance and then recreate a new BrokerService instance.
513         *
514         * @param force - if true enforces a restart.
515         * @throws Exception
516         */
517        public void start(boolean force) throws Exception {
518            forceStart = force;
519            stopped.set(false);
520            started.set(false);
521            start();
522        }
523    
524        // Service interface
525        // -------------------------------------------------------------------------
526    
527        protected boolean shouldAutostart() {
528            return true;
529        }
530    
531        /**
532         *
533         * @throws Exception
534         * @org. apache.xbean.InitMethod
535         */
536        @PostConstruct
537        public void autoStart() throws Exception {
538            if(shouldAutostart()) {
539                start();
540            }
541        }
542    
543        public void start() throws Exception {
544            if (stopped.get() || !started.compareAndSet(false, true)) {
545                // lets just ignore redundant start() calls
546                // as its way too easy to not be completely sure if start() has been
547                // called or not with the gazillion of different configuration
548                // mechanisms
549                // throw new IllegalStateException("Already started.");
550                return;
551            }
552    
553            stopping.set(false);
554            startDate = new Date();
555            MDC.put("activemq.broker", brokerName);
556    
557            try {
558                if (systemExitOnShutdown && useShutdownHook) {
559                    throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
560                }
561                processHelperProperties();
562                if (isUseJmx()) {
563                    // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and
564                    // we cannot cleanup clear that during shutdown of the broker.
565                    MDC.remove("activemq.broker");
566                    try {
567                        startManagementContext();
568                    } finally {
569                        MDC.put("activemq.broker", brokerName);
570                    }
571                }
572                // in jvm master slave, lets not publish over existing broker till we get the lock
573                final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance();
574                if (brokerRegistry.lookup(getBrokerName()) == null) {
575                    brokerRegistry.bind(getBrokerName(), BrokerService.this);
576                }
577                startPersistenceAdapter(startAsync);
578                startBroker(startAsync);
579                brokerRegistry.bind(getBrokerName(), BrokerService.this);
580            } catch (Exception e) {
581                LOG.error("Failed to start Apache ActiveMQ (" + getBrokerName() + ", " + brokerId + "). Reason: " + e, e);
582                try {
583                    if (!stopped.get()) {
584                        stop();
585                    }
586                } catch (Exception ex) {
587                    LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex);
588                }
589                throw e;
590            } finally {
591                MDC.remove("activemq.broker");
592            }
593        }
594    
595        private void startPersistenceAdapter(boolean async) throws Exception {
596            if (async) {
597                new Thread("Persistence Adapter Starting Thread") {
598                    @Override
599                    public void run() {
600                        try {
601                            doStartPersistenceAdapter();
602                        } catch (Throwable e) {
603                            startException = e;
604                        } finally {
605                            synchronized (persistenceAdapterLock) {
606                                persistenceAdapterLock.notifyAll();
607                            }
608                        }
609                    }
610                }.start();
611            } else {
612                doStartPersistenceAdapter();
613            }
614        }
615    
616        private void doStartPersistenceAdapter() throws Exception {
617            getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
618            getPersistenceAdapter().setBrokerName(getBrokerName());
619            LOG.info("Using Persistence Adapter: " + getPersistenceAdapter());
620            if (deleteAllMessagesOnStartup) {
621                deleteAllMessages();
622            }
623            getPersistenceAdapter().start();
624        }
625    
626        private void startBroker(boolean async) throws Exception {
627            if (async) {
628                new Thread("Broker Starting Thread") {
629                    @Override
630                    public void run() {
631                        try {
632                            synchronized (persistenceAdapterLock) {
633                                persistenceAdapterLock.wait();
634                            }
635                            doStartBroker();
636                        } catch (Throwable t) {
637                            startException = t;
638                        }
639                    }
640                }.start();
641            } else {
642                doStartBroker();
643            }
644        }
645    
646        private void doStartBroker() throws Exception {
647            if (startException != null) {
648                return;
649            }
650            slave = false;
651            startDestinations();
652            addShutdownHook();
653    
654            broker = getBroker();
655            brokerId = broker.getBrokerId();
656    
657            // need to log this after creating the broker so we have its id and name
658            if (LOG.isInfoEnabled()) {
659                LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
660                        + getBrokerName() + ", " + brokerId + ") is starting");
661            }
662            broker.start();
663    
664            if (isUseJmx()) {
665                if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
666                    // try to restart management context
667                    // typical for slaves that use the same ports as master
668                    managementContext.stop();
669                    startManagementContext();
670                }
671                ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
672                managedBroker.setContextBroker(broker);
673                adminView.setBroker(managedBroker);
674            }
675            // see if there is a MasterBroker service and if so, configure
676            // it and start it.
677            for (Service service : services) {
678                if (service instanceof MasterConnector) {
679                    configureService(service);
680                    service.start();
681                }
682            }
683            if (!isSlave() && (masterConnector == null || isShutdownOnMasterFailure() == false)) {
684                startAllConnectors();
685            }
686            if (!stopped.get()) {
687                if (isUseJmx() && masterConnector != null) {
688                    registerFTConnectorMBean(masterConnector);
689                }
690            }
691            if (ioExceptionHandler == null) {
692                setIoExceptionHandler(new DefaultIOExceptionHandler());
693            }
694    
695            if (LOG.isInfoEnabled()) {
696                LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
697                        + getBrokerName() + ", " + brokerId + ") started");
698                LOG.info("For help or more information please see: http://activemq.apache.org");
699            }
700    
701            getBroker().brokerServiceStarted();
702            checkSystemUsageLimits();
703            startedLatch.countDown();
704        }
705    
706        /**
707         *
708         * @throws Exception
709         * @org.apache .xbean.DestroyMethod
710         */
711        @PreDestroy
712        public void stop() throws Exception {
713            if (!stopping.compareAndSet(false, true)) {
714                LOG.trace("Broker already stopping/stopped");
715                return;
716            }
717    
718            MDC.put("activemq.broker", brokerName);
719    
720            if (systemExitOnShutdown) {
721                new Thread() {
722                    @Override
723                    public void run() {
724                        System.exit(systemExitOnShutdownExitCode);
725                    }
726                }.start();
727            }
728    
729            if (LOG.isInfoEnabled()) {
730                LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
731                        + getBrokerName() + ", " + brokerId + ") is shutting down");
732            }
733    
734            removeShutdownHook();
735            if (this.scheduler != null) {
736                this.scheduler.stop();
737                this.scheduler = null;
738            }
739            ServiceStopper stopper = new ServiceStopper();
740            if (services != null) {
741                for (Service service : services) {
742                    stopper.stop(service);
743                }
744            }
745            stopAllConnectors(stopper);
746            // remove any VMTransports connected
747            // this has to be done after services are stopped,
748            // to avoid timing issue with discovery (spinning up a new instance)
749            BrokerRegistry.getInstance().unbind(getBrokerName());
750            VMTransportFactory.stopped(getBrokerName());
751            if (broker != null) {
752                stopper.stop(broker);
753                broker = null;
754            }
755    
756            if (tempDataStore != null) {
757                tempDataStore.stop();
758                tempDataStore = null;
759            }
760            try {
761                stopper.stop(persistenceAdapter);
762                persistenceAdapter = null;
763                slave = true;
764                if (isUseJmx()) {
765                    stopper.stop(getManagementContext());
766                    managementContext = null;
767                }
768                // Clear SelectorParser cache to free memory
769                SelectorParser.clearCache();
770            } finally {
771                stopped.set(true);
772                stoppedLatch.countDown();
773            }
774            if (masterConnectorURI == null) {
775                // master start has not finished yet
776                if (slaveStartSignal.getCount() == 1) {
777                    started.set(false);
778                    slaveStartSignal.countDown();
779                }
780            } else {
781                for (Service service : services) {
782                    if (service instanceof MasterConnector) {
783                        MasterConnector mConnector = (MasterConnector) service;
784                        if (!mConnector.isSlave()) {
785                            // means should be slave but not connected to master yet
786                            started.set(false);
787                            mConnector.stopBeforeConnected();
788                        }
789                    }
790                }
791            }
792            if (this.taskRunnerFactory != null) {
793                this.taskRunnerFactory.shutdown();
794                this.taskRunnerFactory = null;
795            }
796            if (this.executor != null) {
797                ThreadPoolUtils.shutdownNow(executor);
798                this.executor = null;
799            }
800    
801            this.destinationInterceptors = null;
802            this.destinationFactory = null;
803    
804            if (LOG.isInfoEnabled()) {
805                if (startDate != null) {
806                    LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
807                            + getBrokerName() + ", " + brokerId + ") uptime " + getUptime());
808                }
809                LOG.info("Apache ActiveMQ " + getBrokerVersion() + " ("
810                        + getBrokerName() + ", " + brokerId + ") is shutdown");
811            }
812    
813            synchronized (shutdownHooks) {
814                for (Runnable hook : shutdownHooks) {
815                    try {
816                        hook.run();
817                    } catch (Throwable e) {
818                        stopper.onException(hook, e);
819                    }
820                }
821            }
822    
823            MDC.remove("activemq.broker");
824    
825            // and clear start date
826            startDate = null;
827    
828            stopper.throwFirstException();
829        }
830    
831        public boolean checkQueueSize(String queueName) {
832            long count = 0;
833            long queueSize = 0;
834            Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
835            for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
836                if (entry.getKey().isQueue()) {
837                    if (entry.getValue().getName().matches(queueName)) {
838                        queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
839                        count += queueSize;
840                        if (queueSize > 0) {
841                            LOG.info("Queue has pending message:" + entry.getValue().getName() + " queueSize is:"
842                                    + queueSize);
843                        }
844                    }
845                }
846            }
847            return count == 0;
848        }
849    
850        /**
851         * This method (both connectorName and queueName are using regex to match)
852         * 1. stop the connector (supposed the user input the connector which the
853         * clients connect to) 2. to check whether there is any pending message on
854         * the queues defined by queueName 3. supposedly, after stop the connector,
855         * client should failover to other broker and pending messages should be
856         * forwarded. if no pending messages, the method finally call stop to stop
857         * the broker.
858         *
859         * @param connectorName
860         * @param queueName
861         * @param timeout
862         * @param pollInterval
863         * @throws Exception
864         */
865        public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
866                throws Exception {
867            if (isUseJmx()) {
868                if (connectorName == null || queueName == null || timeout <= 0) {
869                    throw new Exception(
870                            "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
871                }
872                if (pollInterval <= 0) {
873                    pollInterval = 30;
874                }
875                LOG.info("Stop gracefully with connectorName:" + connectorName + " queueName:" + queueName + " timeout:"
876                        + timeout + " pollInterval:" + pollInterval);
877                TransportConnector connector;
878                for (int i = 0; i < transportConnectors.size(); i++) {
879                    connector = transportConnectors.get(i);
880                    if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
881                        connector.stop();
882                    }
883                }
884                long start = System.currentTimeMillis();
885                while (System.currentTimeMillis() - start < timeout * 1000) {
886                    // check quesize until it gets zero
887                    if (checkQueueSize(queueName)) {
888                        stop();
889                        break;
890                    } else {
891                        Thread.sleep(pollInterval * 1000);
892                    }
893                }
894                if (stopped.get()) {
895                    LOG.info("Successfully stop the broker.");
896                } else {
897                    LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
898                }
899            }
900        }
901    
902        /**
903         * A helper method to block the caller thread until the broker has been
904         * stopped
905         */
906        public void waitUntilStopped() {
907            while (isStarted() && !stopped.get()) {
908                try {
909                    stoppedLatch.await();
910                } catch (InterruptedException e) {
911                    // ignore
912                }
913            }
914        }
915    
916        /**
917         * A helper method to block the caller thread until the broker has fully started
918         * @return boolean true if wait succeeded false if broker was not started or was stopped
919         */
920        public boolean waitUntilStarted() {
921            boolean waitSucceeded = false;
922            while (isStarted() && !stopped.get() && !waitSucceeded) {
923                try {
924                    if (startException != null) {
925                        return waitSucceeded;
926                    }
927                    waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
928                } catch (InterruptedException ignore) {
929                }
930            }
931            return waitSucceeded;
932        }
933    
934        // Properties
935        // -------------------------------------------------------------------------
936        /**
937         * Returns the message broker
938         */
939        public Broker getBroker() throws Exception {
940            if (broker == null) {
941                broker = createBroker();
942            }
943            return broker;
944        }
945    
946        /**
947         * Returns the administration view of the broker; used to create and destroy
948         * resources such as queues and topics. Note this method returns null if JMX
949         * is disabled.
950         */
951        public BrokerView getAdminView() throws Exception {
952            if (adminView == null) {
953                // force lazy creation
954                getBroker();
955            }
956            return adminView;
957        }
958    
959        public void setAdminView(BrokerView adminView) {
960            this.adminView = adminView;
961        }
962    
963        public String getBrokerName() {
964            return brokerName;
965        }
966    
967        /**
968         * Sets the name of this broker; which must be unique in the network
969         *
970         * @param brokerName
971         */
972        public void setBrokerName(String brokerName) {
973            if (brokerName == null) {
974                throw new NullPointerException("The broker name cannot be null");
975            }
976            String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
977            if (!str.equals(brokerName)) {
978                LOG.error("Broker Name: " + brokerName + " contained illegal characters - replaced with " + str);
979            }
980            this.brokerName = str.trim();
981        }
982    
983        public PersistenceAdapterFactory getPersistenceFactory() {
984            return persistenceFactory;
985        }
986    
987        public File getDataDirectoryFile() {
988            if (dataDirectoryFile == null) {
989                dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
990            }
991            return dataDirectoryFile;
992        }
993    
994        public File getBrokerDataDirectory() {
995            String brokerDir = getBrokerName();
996            return new File(getDataDirectoryFile(), brokerDir);
997        }
998    
999        /**
1000         * Sets the directory in which the data files will be stored by default for
1001         * the JDBC and Journal persistence adaptors.
1002         *
1003         * @param dataDirectory
1004         *            the directory to store data files
1005         */
1006        public void setDataDirectory(String dataDirectory) {
1007            setDataDirectoryFile(new File(dataDirectory));
1008        }
1009    
1010        /**
1011         * Sets the directory in which the data files will be stored by default for
1012         * the JDBC and Journal persistence adaptors.
1013         *
1014         * @param dataDirectoryFile
1015         *            the directory to store data files
1016         */
1017        public void setDataDirectoryFile(File dataDirectoryFile) {
1018            this.dataDirectoryFile = dataDirectoryFile;
1019        }
1020    
1021        /**
1022         * @return the tmpDataDirectory
1023         */
1024        public File getTmpDataDirectory() {
1025            if (tmpDataDirectory == null) {
1026                tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
1027            }
1028            return tmpDataDirectory;
1029        }
1030    
1031        /**
1032         * @param tmpDataDirectory
1033         *            the tmpDataDirectory to set
1034         */
1035        public void setTmpDataDirectory(File tmpDataDirectory) {
1036            this.tmpDataDirectory = tmpDataDirectory;
1037        }
1038    
1039        public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
1040            this.persistenceFactory = persistenceFactory;
1041        }
1042    
1043        public void setDestinationFactory(DestinationFactory destinationFactory) {
1044            this.destinationFactory = destinationFactory;
1045        }
1046    
1047        public boolean isPersistent() {
1048            return persistent;
1049        }
1050    
1051        /**
1052         * Sets whether or not persistence is enabled or disabled.
1053         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1054         */
1055        public void setPersistent(boolean persistent) {
1056            this.persistent = persistent;
1057        }
1058    
1059        public boolean isPopulateJMSXUserID() {
1060            return populateJMSXUserID;
1061        }
1062    
1063        /**
1064         * Sets whether or not the broker should populate the JMSXUserID header.
1065         */
1066        public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
1067            this.populateJMSXUserID = populateJMSXUserID;
1068        }
1069    
1070        public SystemUsage getSystemUsage() {
1071            try {
1072                if (systemUsage == null) {
1073                    systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore());
1074                    systemUsage.setExecutor(getExecutor());
1075                    systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // 64 MB
1076                    systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
1077                    systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
1078                    addService(this.systemUsage);
1079                }
1080                return systemUsage;
1081            } catch (IOException e) {
1082                LOG.error("Cannot create SystemUsage", e);
1083                throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
1084            }
1085        }
1086    
1087        public void setSystemUsage(SystemUsage memoryManager) {
1088            if (this.systemUsage != null) {
1089                removeService(this.systemUsage);
1090            }
1091            this.systemUsage = memoryManager;
1092            if (this.systemUsage.getExecutor()==null) {
1093                this.systemUsage.setExecutor(getExecutor());
1094            }
1095            addService(this.systemUsage);
1096        }
1097    
1098        /**
1099         * @return the consumerUsageManager
1100         * @throws IOException
1101         */
1102        public SystemUsage getConsumerSystemUsage() throws IOException {
1103            if (this.consumerSystemUsaage == null) {
1104                if (splitSystemUsageForProducersConsumers) {
1105                    this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
1106                    float portion = consumerSystemUsagePortion / 100f;
1107                    this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
1108                    addService(this.consumerSystemUsaage);
1109                } else {
1110                    consumerSystemUsaage = getSystemUsage();
1111                }
1112            }
1113            return this.consumerSystemUsaage;
1114        }
1115    
1116        /**
1117         * @param consumerSystemUsaage
1118         *            the storeSystemUsage to set
1119         */
1120        public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
1121            if (this.consumerSystemUsaage != null) {
1122                removeService(this.consumerSystemUsaage);
1123            }
1124            this.consumerSystemUsaage = consumerSystemUsaage;
1125            addService(this.consumerSystemUsaage);
1126        }
1127    
1128        /**
1129         * @return the producerUsageManager
1130         * @throws IOException
1131         */
1132        public SystemUsage getProducerSystemUsage() throws IOException {
1133            if (producerSystemUsage == null) {
1134                if (splitSystemUsageForProducersConsumers) {
1135                    producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
1136                    float portion = producerSystemUsagePortion / 100f;
1137                    producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
1138                    addService(producerSystemUsage);
1139                } else {
1140                    producerSystemUsage = getSystemUsage();
1141                }
1142            }
1143            return producerSystemUsage;
1144        }
1145    
1146        /**
1147         * @param producerUsageManager
1148         *            the producerUsageManager to set
1149         */
1150        public void setProducerSystemUsage(SystemUsage producerUsageManager) {
1151            if (this.producerSystemUsage != null) {
1152                removeService(this.producerSystemUsage);
1153            }
1154            this.producerSystemUsage = producerUsageManager;
1155            addService(this.producerSystemUsage);
1156        }
1157    
1158        public PersistenceAdapter getPersistenceAdapter() throws IOException {
1159            if (persistenceAdapter == null) {
1160                persistenceAdapter = createPersistenceAdapter();
1161                configureService(persistenceAdapter);
1162                this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1163            }
1164            return persistenceAdapter;
1165        }
1166    
1167        /**
1168         * Sets the persistence adaptor implementation to use for this broker
1169         *
1170         * @throws IOException
1171         */
1172        public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
1173            if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) {
1174                LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: " + persistenceAdapter);
1175                return;
1176            }
1177            this.persistenceAdapter = persistenceAdapter;
1178            configureService(this.persistenceAdapter);
1179            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
1180        }
1181    
1182        public TaskRunnerFactory getTaskRunnerFactory() {
1183            if (this.taskRunnerFactory == null) {
1184                this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
1185                        isDedicatedTaskRunner());
1186            }
1187            return this.taskRunnerFactory;
1188        }
1189    
1190        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
1191            this.taskRunnerFactory = taskRunnerFactory;
1192        }
1193    
1194        public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
1195            if (taskRunnerFactory == null) {
1196                persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
1197                        true, 1000, isDedicatedTaskRunner());
1198            }
1199            return persistenceTaskRunnerFactory;
1200        }
1201    
1202        public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
1203            this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
1204        }
1205    
1206        public boolean isUseJmx() {
1207            return useJmx;
1208        }
1209    
1210        public boolean isEnableStatistics() {
1211            return enableStatistics;
1212        }
1213    
1214        /**
1215         * Sets whether or not the Broker's services enable statistics or not.
1216         */
1217        public void setEnableStatistics(boolean enableStatistics) {
1218            this.enableStatistics = enableStatistics;
1219        }
1220    
1221        /**
1222         * Sets whether or not the Broker's services should be exposed into JMX or
1223         * not.
1224         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1225         */
1226        public void setUseJmx(boolean useJmx) {
1227            this.useJmx = useJmx;
1228        }
1229    
1230        public ObjectName getBrokerObjectName() throws IOException {
1231            if (brokerObjectName == null) {
1232                brokerObjectName = createBrokerObjectName();
1233            }
1234            return brokerObjectName;
1235        }
1236    
1237        /**
1238         * Sets the JMX ObjectName for this broker
1239         */
1240        public void setBrokerObjectName(ObjectName brokerObjectName) {
1241            this.brokerObjectName = brokerObjectName;
1242        }
1243    
1244        public ManagementContext getManagementContext() {
1245            if (managementContext == null) {
1246                managementContext = new ManagementContext();
1247            }
1248            return managementContext;
1249        }
1250    
1251        public void setManagementContext(ManagementContext managementContext) {
1252            this.managementContext = managementContext;
1253        }
1254    
1255        public NetworkConnector getNetworkConnectorByName(String connectorName) {
1256            for (NetworkConnector connector : networkConnectors) {
1257                if (connector.getName().equals(connectorName)) {
1258                    return connector;
1259                }
1260            }
1261            return null;
1262        }
1263    
1264        public String[] getNetworkConnectorURIs() {
1265            return networkConnectorURIs;
1266        }
1267    
1268        public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
1269            this.networkConnectorURIs = networkConnectorURIs;
1270        }
1271    
1272        public TransportConnector getConnectorByName(String connectorName) {
1273            for (TransportConnector connector : transportConnectors) {
1274                if (connector.getName().equals(connectorName)) {
1275                    return connector;
1276                }
1277            }
1278            return null;
1279        }
1280    
1281        public Map<String, String> getTransportConnectorURIsAsMap() {
1282            Map<String, String> answer = new HashMap<String, String>();
1283            for (TransportConnector connector : transportConnectors) {
1284                try {
1285                    URI uri = connector.getConnectUri();
1286                    if (uri != null) {
1287                        String scheme = uri.getScheme();
1288                        if (scheme != null) {
1289                            answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString());
1290                        }
1291                    }
1292                } catch (Exception e) {
1293                    LOG.debug("Failed to read URI to build transportURIsAsMap", e);
1294                }
1295            }
1296            return answer;
1297        }
1298    
1299        public String[] getTransportConnectorURIs() {
1300            return transportConnectorURIs;
1301        }
1302    
1303        public void setTransportConnectorURIs(String[] transportConnectorURIs) {
1304            this.transportConnectorURIs = transportConnectorURIs;
1305        }
1306    
1307        /**
1308         * @return Returns the jmsBridgeConnectors.
1309         */
1310        public JmsConnector[] getJmsBridgeConnectors() {
1311            return jmsBridgeConnectors;
1312        }
1313    
1314        /**
1315         * @param jmsConnectors
1316         *            The jmsBridgeConnectors to set.
1317         */
1318        public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
1319            this.jmsBridgeConnectors = jmsConnectors;
1320        }
1321    
1322        public Service[] getServices() {
1323            return services.toArray(new Service[0]);
1324        }
1325    
1326        /**
1327         * Sets the services associated with this broker such as a
1328         * {@link MasterConnector}
1329         */
1330        public void setServices(Service[] services) {
1331            this.services.clear();
1332            if (services != null) {
1333                for (int i = 0; i < services.length; i++) {
1334                    this.services.add(services[i]);
1335                }
1336            }
1337        }
1338    
1339        /**
1340         * Adds a new service so that it will be started as part of the broker
1341         * lifecycle
1342         */
1343        public void addService(Service service) {
1344            services.add(service);
1345        }
1346    
1347        public void removeService(Service service) {
1348            services.remove(service);
1349        }
1350    
1351        public boolean isUseLoggingForShutdownErrors() {
1352            return useLoggingForShutdownErrors;
1353        }
1354    
1355        /**
1356         * Sets whether or not we should use commons-logging when reporting errors
1357         * when shutting down the broker
1358         */
1359        public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
1360            this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
1361        }
1362    
1363        public boolean isUseShutdownHook() {
1364            return useShutdownHook;
1365        }
1366    
1367        /**
1368         * Sets whether or not we should use a shutdown handler to close down the
1369         * broker cleanly if the JVM is terminated. It is recommended you leave this
1370         * enabled.
1371         */
1372        public void setUseShutdownHook(boolean useShutdownHook) {
1373            this.useShutdownHook = useShutdownHook;
1374        }
1375    
1376        public boolean isAdvisorySupport() {
1377            return advisorySupport;
1378        }
1379    
1380        /**
1381         * Allows the support of advisory messages to be disabled for performance
1382         * reasons.
1383         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1384         */
1385        public void setAdvisorySupport(boolean advisorySupport) {
1386            this.advisorySupport = advisorySupport;
1387        }
1388    
1389        public List<TransportConnector> getTransportConnectors() {
1390            return new ArrayList<TransportConnector>(transportConnectors);
1391        }
1392    
1393        /**
1394         * Sets the transport connectors which this broker will listen on for new
1395         * clients
1396         *
1397         * @org.apache.xbean.Property
1398         *                            nestedType="org.apache.activemq.broker.TransportConnector"
1399         */
1400        public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
1401            for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
1402                TransportConnector connector = iter.next();
1403                addConnector(connector);
1404            }
1405        }
1406    
1407        public TransportConnector getTransportConnectorByName(String name){
1408            for (TransportConnector transportConnector:transportConnectors){
1409               if (name.equals(transportConnector.getName())){
1410                   return transportConnector;
1411               }
1412            }
1413            return null;
1414        }
1415    
1416        public TransportConnector getTransportConnectorByScheme(String scheme){
1417            for (TransportConnector transportConnector:transportConnectors){
1418                if (scheme.equals(transportConnector.getUri().getScheme())){
1419                    return transportConnector;
1420                }
1421            }
1422            return null;
1423        }
1424    
1425        public List<NetworkConnector> getNetworkConnectors() {
1426            return new ArrayList<NetworkConnector>(networkConnectors);
1427        }
1428    
1429        public List<ProxyConnector> getProxyConnectors() {
1430            return new ArrayList<ProxyConnector>(proxyConnectors);
1431        }
1432    
1433        /**
1434         * Sets the network connectors which this broker will use to connect to
1435         * other brokers in a federated network
1436         *
1437         * @org.apache.xbean.Property
1438         *                            nestedType="org.apache.activemq.network.NetworkConnector"
1439         */
1440        public void setNetworkConnectors(List networkConnectors) throws Exception {
1441            for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
1442                NetworkConnector connector = (NetworkConnector) iter.next();
1443                addNetworkConnector(connector);
1444            }
1445        }
1446    
1447        /**
1448         * Sets the network connectors which this broker will use to connect to
1449         * other brokers in a federated network
1450         */
1451        public void setProxyConnectors(List proxyConnectors) throws Exception {
1452            for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) {
1453                ProxyConnector connector = (ProxyConnector) iter.next();
1454                addProxyConnector(connector);
1455            }
1456        }
1457    
1458        public PolicyMap getDestinationPolicy() {
1459            return destinationPolicy;
1460        }
1461    
1462        /**
1463         * Sets the destination specific policies available either for exact
1464         * destinations or for wildcard areas of destinations.
1465         */
1466        public void setDestinationPolicy(PolicyMap policyMap) {
1467            this.destinationPolicy = policyMap;
1468        }
1469    
1470        public BrokerPlugin[] getPlugins() {
1471            return plugins;
1472        }
1473    
1474        /**
1475         * Sets a number of broker plugins to install such as for security
1476         * authentication or authorization
1477         */
1478        public void setPlugins(BrokerPlugin[] plugins) {
1479            this.plugins = plugins;
1480        }
1481    
1482        public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
1483            return messageAuthorizationPolicy;
1484        }
1485    
1486        /**
1487         * Sets the policy used to decide if the current connection is authorized to
1488         * consume a given message
1489         */
1490        public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
1491            this.messageAuthorizationPolicy = messageAuthorizationPolicy;
1492        }
1493    
1494        /**
1495         * Delete all messages from the persistent store
1496         *
1497         * @throws IOException
1498         */
1499        public void deleteAllMessages() throws IOException {
1500            getPersistenceAdapter().deleteAllMessages();
1501        }
1502    
1503        public boolean isDeleteAllMessagesOnStartup() {
1504            return deleteAllMessagesOnStartup;
1505        }
1506    
1507        /**
1508         * Sets whether or not all messages are deleted on startup - mostly only
1509         * useful for testing.
1510         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
1511         */
1512        public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
1513            this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
1514        }
1515    
1516        public URI getVmConnectorURI() {
1517            if (vmConnectorURI == null) {
1518                try {
1519                    vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
1520                } catch (URISyntaxException e) {
1521                    LOG.error("Badly formed URI from " + getBrokerName(), e);
1522                }
1523            }
1524            return vmConnectorURI;
1525        }
1526    
1527        public void setVmConnectorURI(URI vmConnectorURI) {
1528            this.vmConnectorURI = vmConnectorURI;
1529        }
1530    
1531        public String getDefaultSocketURIString() {
1532    
1533                if (started.get()) {
1534                    if (this.defaultSocketURIString == null) {
1535                        for (TransportConnector tc:this.transportConnectors) {
1536                            String result = null;
1537                            try {
1538                                result = tc.getPublishableConnectString();
1539                            } catch (Exception e) {
1540                              LOG.warn("Failed to get the ConnectURI for "+tc,e);
1541                            }
1542                            if (result != null) {
1543                                // find first publishable uri
1544                                if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
1545                                    this.defaultSocketURIString = result;
1546                                    break;
1547                                } else {
1548                                // or use the first defined
1549                                    if (this.defaultSocketURIString == null) {
1550                                        this.defaultSocketURIString = result;
1551                                    }
1552                                }
1553                            }
1554                        }
1555    
1556                    }
1557                    return this.defaultSocketURIString;
1558                }
1559           return null;
1560        }
1561    
1562        /**
1563         * @return Returns the shutdownOnMasterFailure.
1564         */
1565        public boolean isShutdownOnMasterFailure() {
1566            return shutdownOnMasterFailure;
1567        }
1568    
1569        /**
1570         * @param shutdownOnMasterFailure
1571         *            The shutdownOnMasterFailure to set.
1572         */
1573        public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
1574            this.shutdownOnMasterFailure = shutdownOnMasterFailure;
1575        }
1576    
1577        public boolean isKeepDurableSubsActive() {
1578            return keepDurableSubsActive;
1579        }
1580    
1581        public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
1582            this.keepDurableSubsActive = keepDurableSubsActive;
1583        }
1584    
1585        public boolean isUseVirtualTopics() {
1586            return useVirtualTopics;
1587        }
1588    
1589        /**
1590         * Sets whether or not <a
1591         * href="http://activemq.apache.org/virtual-destinations.html">Virtual
1592         * Topics</a> should be supported by default if they have not been
1593         * explicitly configured.
1594         */
1595        public void setUseVirtualTopics(boolean useVirtualTopics) {
1596            this.useVirtualTopics = useVirtualTopics;
1597        }
1598    
1599        public DestinationInterceptor[] getDestinationInterceptors() {
1600            return destinationInterceptors;
1601        }
1602    
1603        public boolean isUseMirroredQueues() {
1604            return useMirroredQueues;
1605        }
1606    
1607        /**
1608         * Sets whether or not <a
1609         * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
1610         * Queues</a> should be supported by default if they have not been
1611         * explicitly configured.
1612         */
1613        public void setUseMirroredQueues(boolean useMirroredQueues) {
1614            this.useMirroredQueues = useMirroredQueues;
1615        }
1616    
1617        /**
1618         * Sets the destination interceptors to use
1619         */
1620        public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
1621            this.destinationInterceptors = destinationInterceptors;
1622        }
1623    
1624        public ActiveMQDestination[] getDestinations() {
1625            return destinations;
1626        }
1627    
1628        /**
1629         * Sets the destinations which should be loaded/created on startup
1630         */
1631        public void setDestinations(ActiveMQDestination[] destinations) {
1632            this.destinations = destinations;
1633        }
1634    
1635        /**
1636         * @return the tempDataStore
1637         */
1638        public synchronized PListStore getTempDataStore() {
1639            if (tempDataStore == null) {
1640                if (!isPersistent()) {
1641                    return null;
1642                }
1643                boolean result = true;
1644                boolean empty = true;
1645                try {
1646                    File directory = getTmpDataDirectory();
1647                    if (directory.exists() && directory.isDirectory()) {
1648                        File[] files = directory.listFiles();
1649                        if (files != null && files.length > 0) {
1650                            empty = false;
1651                            for (int i = 0; i < files.length; i++) {
1652                                File file = files[i];
1653                                if (!file.isDirectory()) {
1654                                    result &= file.delete();
1655                                }
1656                            }
1657                        }
1658                    }
1659                    if (!empty) {
1660                        String str = result ? "Successfully deleted" : "Failed to delete";
1661                        LOG.info(str + " temporary storage");
1662                    }
1663                    this.tempDataStore = new PListStore();
1664                    this.tempDataStore.setDirectory(getTmpDataDirectory());
1665                    configureService(tempDataStore);
1666                    this.tempDataStore.start();
1667                } catch (Exception e) {
1668                    throw new RuntimeException(e);
1669                }
1670            }
1671            return tempDataStore;
1672        }
1673    
1674        /**
1675         * @param tempDataStore
1676         *            the tempDataStore to set
1677         */
1678        public void setTempDataStore(PListStore tempDataStore) {
1679            this.tempDataStore = tempDataStore;
1680            configureService(tempDataStore);
1681            try {
1682                tempDataStore.start();
1683            } catch (Exception e) {
1684                RuntimeException exception = new RuntimeException("Failed to start provided temp data store: " + tempDataStore, e);
1685                LOG.error(exception.getLocalizedMessage(), e);
1686                throw exception;
1687            }
1688        }
1689    
1690        public int getPersistenceThreadPriority() {
1691            return persistenceThreadPriority;
1692        }
1693    
1694        public void setPersistenceThreadPriority(int persistenceThreadPriority) {
1695            this.persistenceThreadPriority = persistenceThreadPriority;
1696        }
1697    
1698        /**
1699         * @return the useLocalHostBrokerName
1700         */
1701        public boolean isUseLocalHostBrokerName() {
1702            return this.useLocalHostBrokerName;
1703        }
1704    
1705        /**
1706         * @param useLocalHostBrokerName
1707         *            the useLocalHostBrokerName to set
1708         */
1709        public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
1710            this.useLocalHostBrokerName = useLocalHostBrokerName;
1711            if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
1712                brokerName = LOCAL_HOST_NAME;
1713            }
1714        }
1715    
1716        /**
1717         * @return the supportFailOver
1718         */
1719        public boolean isSupportFailOver() {
1720            return this.supportFailOver;
1721        }
1722    
1723        /**
1724         * @param supportFailOver
1725         *            the supportFailOver to set
1726         */
1727        public void setSupportFailOver(boolean supportFailOver) {
1728            this.supportFailOver = supportFailOver;
1729        }
1730    
1731        /**
1732         * Looks up and lazily creates if necessary the destination for the given
1733         * JMS name
1734         */
1735        public Destination getDestination(ActiveMQDestination destination) throws Exception {
1736            return getBroker().addDestination(getAdminConnectionContext(), destination,false);
1737        }
1738    
1739        public void removeDestination(ActiveMQDestination destination) throws Exception {
1740            getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
1741        }
1742    
1743        public int getProducerSystemUsagePortion() {
1744            return producerSystemUsagePortion;
1745        }
1746    
1747        public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
1748            this.producerSystemUsagePortion = producerSystemUsagePortion;
1749        }
1750    
1751        public int getConsumerSystemUsagePortion() {
1752            return consumerSystemUsagePortion;
1753        }
1754    
1755        public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
1756            this.consumerSystemUsagePortion = consumerSystemUsagePortion;
1757        }
1758    
1759        public boolean isSplitSystemUsageForProducersConsumers() {
1760            return splitSystemUsageForProducersConsumers;
1761        }
1762    
1763        public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
1764            this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
1765        }
1766    
1767        public boolean isMonitorConnectionSplits() {
1768            return monitorConnectionSplits;
1769        }
1770    
1771        public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
1772            this.monitorConnectionSplits = monitorConnectionSplits;
1773        }
1774    
1775        public int getTaskRunnerPriority() {
1776            return taskRunnerPriority;
1777        }
1778    
1779        public void setTaskRunnerPriority(int taskRunnerPriority) {
1780            this.taskRunnerPriority = taskRunnerPriority;
1781        }
1782    
1783        public boolean isDedicatedTaskRunner() {
1784            return dedicatedTaskRunner;
1785        }
1786    
1787        public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
1788            this.dedicatedTaskRunner = dedicatedTaskRunner;
1789        }
1790    
1791        public boolean isCacheTempDestinations() {
1792            return cacheTempDestinations;
1793        }
1794    
1795        public void setCacheTempDestinations(boolean cacheTempDestinations) {
1796            this.cacheTempDestinations = cacheTempDestinations;
1797        }
1798    
1799        public int getTimeBeforePurgeTempDestinations() {
1800            return timeBeforePurgeTempDestinations;
1801        }
1802    
1803        public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
1804            this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
1805        }
1806    
1807        public boolean isUseTempMirroredQueues() {
1808            return useTempMirroredQueues;
1809        }
1810    
1811        public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
1812            this.useTempMirroredQueues = useTempMirroredQueues;
1813        }
1814    
1815        //
1816        // Implementation methods
1817        // -------------------------------------------------------------------------
1818        /**
1819         * Handles any lazy-creation helper properties which are added to make
1820         * things easier to configure inside environments such as Spring
1821         *
1822         * @throws Exception
1823         */
1824        protected void processHelperProperties() throws Exception {
1825            boolean masterServiceExists = false;
1826            if (transportConnectorURIs != null) {
1827                for (int i = 0; i < transportConnectorURIs.length; i++) {
1828                    String uri = transportConnectorURIs[i];
1829                    addConnector(uri);
1830                }
1831            }
1832            if (networkConnectorURIs != null) {
1833                for (int i = 0; i < networkConnectorURIs.length; i++) {
1834                    String uri = networkConnectorURIs[i];
1835                    addNetworkConnector(uri);
1836                }
1837            }
1838            if (jmsBridgeConnectors != null) {
1839                for (int i = 0; i < jmsBridgeConnectors.length; i++) {
1840                    addJmsConnector(jmsBridgeConnectors[i]);
1841                }
1842            }
1843            for (Service service : services) {
1844                if (service instanceof MasterConnector) {
1845                    masterServiceExists = true;
1846                    break;
1847                }
1848            }
1849            if (masterConnectorURI != null) {
1850                if (masterServiceExists) {
1851                    throw new IllegalStateException(
1852                            "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
1853                } else {
1854                    addService(new MasterConnector(masterConnectorURI));
1855                }
1856            }
1857        }
1858    
1859        protected void checkSystemUsageLimits() throws IOException {
1860            SystemUsage usage = getSystemUsage();
1861            long memLimit = usage.getMemoryUsage().getLimit();
1862            long jvmLimit = Runtime.getRuntime().maxMemory();
1863    
1864            if (memLimit > jvmLimit) {
1865                LOG.error("Memory Usage for the Broker (" + memLimit / (1024 * 1024) +
1866                          " mb) is more than the maximum available for the JVM: " +
1867                          jvmLimit / (1024 * 1024) + " mb");
1868            }
1869    
1870            if (getPersistenceAdapter() != null) {
1871                PersistenceAdapter adapter = getPersistenceAdapter();
1872                File dir = adapter.getDirectory();
1873    
1874                if (dir != null) {
1875                    String dirPath = dir.getAbsolutePath();
1876                    if (!dir.isAbsolute()) {
1877                        dir = new File(dirPath);
1878                    }
1879    
1880                    while (dir != null && dir.isDirectory() == false) {
1881                        dir = dir.getParentFile();
1882                    }
1883                    long storeLimit = usage.getStoreUsage().getLimit();
1884                    long dirFreeSpace = dir.getUsableSpace();
1885                    if (storeLimit > dirFreeSpace) {
1886                        LOG.warn("Store limit is " + storeLimit / (1024 * 1024) +
1887                                 " mb, whilst the data directory: " + dir.getAbsolutePath() +
1888                                 " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
1889                    }
1890                }
1891    
1892                long maxJournalFileSize = 0;
1893                long storeLimit = usage.getStoreUsage().getLimit();
1894    
1895                if (adapter instanceof KahaDBPersistenceAdapter) {
1896                    KahaDBPersistenceAdapter kahaDB = (KahaDBPersistenceAdapter) adapter;
1897                    maxJournalFileSize = kahaDB.getJournalMaxFileLength();
1898                } else if (adapter instanceof AMQPersistenceAdapter) {
1899                    AMQPersistenceAdapter amqAdapter = (AMQPersistenceAdapter) adapter;
1900                    maxJournalFileSize = amqAdapter.getMaxFileLength();
1901                }
1902    
1903                if (storeLimit < maxJournalFileSize) {
1904                    LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
1905                              " mb, whilst the max journal file size for the store is: " +
1906                              maxJournalFileSize / (1024 * 1024) + " mb, " +
1907                              "the store will not accept any data when used.");
1908                }
1909            }
1910    
1911            File tmpDir = getTmpDataDirectory();
1912            if (tmpDir != null) {
1913    
1914                String tmpDirPath = tmpDir.getAbsolutePath();
1915                if (!tmpDir.isAbsolute()) {
1916                    tmpDir = new File(tmpDirPath);
1917                }
1918    
1919                long storeLimit = usage.getTempUsage().getLimit();
1920                while (tmpDir != null && tmpDir.isDirectory() == false) {
1921                    tmpDir = tmpDir.getParentFile();
1922                }
1923                long dirFreeSpace = tmpDir.getUsableSpace();
1924                if (storeLimit > dirFreeSpace) {
1925                    LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
1926                              " mb, whilst the temporary data directory: " + tmpDirPath +
1927                              " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space");
1928                }
1929    
1930                if (isPersistent()) {
1931                    long maxJournalFileSize;
1932    
1933                    if (usage.getTempUsage().getStore() != null) {
1934                        maxJournalFileSize = usage.getTempUsage().getStore().getJournalMaxFileLength();
1935                    } else {
1936                        maxJournalFileSize = org.apache.kahadb.journal.Journal.DEFAULT_MAX_FILE_LENGTH;
1937                    }
1938    
1939                    if (storeLimit < maxJournalFileSize) {
1940                        LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
1941                                  " mb, whilst the max journal file size for the temporary store is: " +
1942                                  maxJournalFileSize / (1024 * 1024) + " mb, " +
1943                                  "the temp store will not accept any data when used.");
1944                    }
1945                }
1946            }
1947        }
1948    
1949        public void stopAllConnectors(ServiceStopper stopper) {
1950            for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
1951                NetworkConnector connector = iter.next();
1952                unregisterNetworkConnectorMBean(connector);
1953                stopper.stop(connector);
1954            }
1955            for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
1956                ProxyConnector connector = iter.next();
1957                stopper.stop(connector);
1958            }
1959            for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
1960                JmsConnector connector = iter.next();
1961                stopper.stop(connector);
1962            }
1963            for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
1964                TransportConnector connector = iter.next();
1965                stopper.stop(connector);
1966            }
1967        }
1968    
1969        protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
1970            try {
1971                ObjectName objectName = createConnectorObjectName(connector);
1972                connector = connector.asManagedConnector(getManagementContext(), objectName);
1973                ConnectorViewMBean view = new ConnectorView(connector);
1974                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
1975                return connector;
1976            } catch (Throwable e) {
1977                throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
1978            }
1979        }
1980    
1981        protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
1982            if (isUseJmx()) {
1983                try {
1984                    ObjectName objectName = createConnectorObjectName(connector);
1985                    getManagementContext().unregisterMBean(objectName);
1986                } catch (Throwable e) {
1987                    throw IOExceptionSupport.create(
1988                            "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
1989                }
1990            }
1991        }
1992    
1993        protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1994            return adaptor;
1995        }
1996    
1997        protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
1998            if (isUseJmx()) {
1999            }
2000        }
2001    
2002        private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
2003            return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2004                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Connector," + "ConnectorName="
2005                    + JMXSupport.encodeObjectNamePart(connector.getName()));
2006        }
2007    
2008        protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
2009            NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
2010            try {
2011                ObjectName objectName = createNetworkConnectorObjectName(connector);
2012                connector.setObjectName(objectName);
2013                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2014            } catch (Throwable e) {
2015                throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
2016            }
2017        }
2018    
2019        protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector)
2020                throws MalformedObjectNameException {
2021            return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2022                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
2023                    + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
2024        }
2025    
2026    
2027        public ObjectName createDuplexNetworkConnectorObjectName(String transport)
2028                throws MalformedObjectNameException {
2029            return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2030                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=NetworkConnector,"
2031                    + "NetworkConnectorName=duplex" + JMXSupport.encodeObjectNamePart(transport));
2032        }
2033    
2034        protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
2035            if (isUseJmx()) {
2036                try {
2037                    ObjectName objectName = createNetworkConnectorObjectName(connector);
2038                    getManagementContext().unregisterMBean(objectName);
2039                } catch (Exception e) {
2040                    LOG.error("Network Connector could not be unregistered from JMX: " + e, e);
2041                }
2042            }
2043        }
2044    
2045        protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
2046            ProxyConnectorView view = new ProxyConnectorView(connector);
2047            try {
2048                ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2049                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=ProxyConnector,"
2050                        + "ProxyConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
2051                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2052            } catch (Throwable e) {
2053                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2054            }
2055        }
2056    
2057        protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
2058            FTConnectorView view = new FTConnectorView(connector);
2059            try {
2060                ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2061                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
2062                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2063            } catch (Throwable e) {
2064                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2065            }
2066        }
2067    
2068        protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
2069            JmsConnectorView view = new JmsConnectorView(connector);
2070            try {
2071                ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2072                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=JmsConnector,"
2073                        + "JmsConnectorName=" + JMXSupport.encodeObjectNamePart(connector.getName()));
2074                AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2075            } catch (Throwable e) {
2076                throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
2077            }
2078        }
2079    
2080        /**
2081         * Factory method to create a new broker
2082         *
2083         * @throws Exception
2084         * @throws
2085         * @throws
2086         */
2087        protected Broker createBroker() throws Exception {
2088            regionBroker = createRegionBroker();
2089            Broker broker = addInterceptors(regionBroker);
2090            // Add a filter that will stop access to the broker once stopped
2091            broker = new MutableBrokerFilter(broker) {
2092                Broker old;
2093    
2094                @Override
2095                public void stop() throws Exception {
2096                    old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
2097                        // Just ignore additional stop actions.
2098                        @Override
2099                        public void stop() throws Exception {
2100                        }
2101                    });
2102                    old.stop();
2103                }
2104    
2105                @Override
2106                public void start() throws Exception {
2107                    if (forceStart && old != null) {
2108                        this.next.set(old);
2109                    }
2110                    getNext().start();
2111                }
2112            };
2113            return broker;
2114        }
2115    
2116        /**
2117         * Factory method to create the core region broker onto which interceptors
2118         * are added
2119         *
2120         * @throws Exception
2121         */
2122        protected Broker createRegionBroker() throws Exception {
2123            if (destinationInterceptors == null) {
2124                destinationInterceptors = createDefaultDestinationInterceptor();
2125            }
2126            configureServices(destinationInterceptors);
2127            DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors);
2128            if (destinationFactory == null) {
2129                destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter());
2130            }
2131            return createRegionBroker(destinationInterceptor);
2132        }
2133    
2134        protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
2135            RegionBroker regionBroker;
2136            if (isUseJmx()) {
2137                regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
2138                        getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
2139            } else {
2140                regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
2141                        destinationInterceptor,getScheduler(),getExecutor());
2142            }
2143            destinationFactory.setRegionBroker(regionBroker);
2144            regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
2145            regionBroker.setBrokerName(getBrokerName());
2146            regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
2147            regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend());
2148            if (brokerId != null) {
2149                regionBroker.setBrokerId(brokerId);
2150            }
2151            return regionBroker;
2152        }
2153    
2154        /**
2155         * Create the default destination interceptor
2156         */
2157        protected DestinationInterceptor[] createDefaultDestinationInterceptor() {
2158            List<DestinationInterceptor> answer = new ArrayList<DestinationInterceptor>();
2159            if (isUseVirtualTopics()) {
2160                VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
2161                VirtualTopic virtualTopic = new VirtualTopic();
2162                virtualTopic.setName("VirtualTopic.>");
2163                VirtualDestination[] virtualDestinations = { virtualTopic };
2164                interceptor.setVirtualDestinations(virtualDestinations);
2165                answer.add(interceptor);
2166            }
2167            if (isUseMirroredQueues()) {
2168                MirroredQueue interceptor = new MirroredQueue();
2169                answer.add(interceptor);
2170            }
2171            DestinationInterceptor[] array = new DestinationInterceptor[answer.size()];
2172            answer.toArray(array);
2173            return array;
2174        }
2175    
2176        /**
2177         * Strategy method to add interceptors to the broker
2178         *
2179         * @throws IOException
2180         */
2181        protected Broker addInterceptors(Broker broker) throws Exception {
2182            if (isSchedulerSupport()) {
2183                SchedulerBroker sb = new SchedulerBroker(broker, getSchedulerDirectoryFile());
2184                if (isUseJmx()) {
2185                    JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
2186                    try {
2187                        ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":"
2188                                + "BrokerName=" + JMXSupport.encodeObjectNamePart(getBrokerName()) + ","
2189                                + "Type=jobScheduler," + "jobSchedulerName=JMS");
2190    
2191                        AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
2192                        this.adminView.setJMSJobScheduler(objectName);
2193                    } catch (Throwable e) {
2194                        throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
2195                                + e.getMessage(), e);
2196                    }
2197    
2198                }
2199                broker = sb;
2200            }
2201            if (isAdvisorySupport()) {
2202                broker = new AdvisoryBroker(broker);
2203            }
2204            broker = new CompositeDestinationBroker(broker);
2205            broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
2206            if (isPopulateJMSXUserID()) {
2207                UserIDBroker userIDBroker = new UserIDBroker(broker);
2208                userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
2209                broker = userIDBroker;
2210            }
2211            if (isMonitorConnectionSplits()) {
2212                broker = new ConnectionSplitBroker(broker);
2213            }
2214            if (plugins != null) {
2215                for (int i = 0; i < plugins.length; i++) {
2216                    BrokerPlugin plugin = plugins[i];
2217                    broker = plugin.installPlugin(broker);
2218                }
2219            }
2220            return broker;
2221        }
2222    
2223        protected PersistenceAdapter createPersistenceAdapter() throws IOException {
2224            if (isPersistent()) {
2225                PersistenceAdapterFactory fac = getPersistenceFactory();
2226                if (fac != null) {
2227                    return fac.createPersistenceAdapter();
2228                }else {
2229                    KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter();
2230                    File dir = new File(getBrokerDataDirectory(),"KahaDB");
2231                    adaptor.setDirectory(dir);
2232                    return adaptor;
2233                }
2234            } else {
2235                return new MemoryPersistenceAdapter();
2236            }
2237        }
2238    
2239        protected ObjectName createBrokerObjectName() throws IOException {
2240            try {
2241                return new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
2242                        + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=Broker");
2243            } catch (Throwable e) {
2244                throw IOExceptionSupport.create("Invalid JMX broker name: " + brokerName, e);
2245            }
2246        }
2247    
2248        protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {
2249            TransportServer transport = TransportFactory.bind(this, brokerURI);
2250            return new TransportConnector(transport);
2251        }
2252    
2253        /**
2254         * Extracts the port from the options
2255         */
2256        protected Object getPort(Map options) {
2257            Object port = options.get("port");
2258            if (port == null) {
2259                port = DEFAULT_PORT;
2260                LOG.warn("No port specified so defaulting to: " + port);
2261            }
2262            return port;
2263        }
2264    
2265        protected void addShutdownHook() {
2266            if (useShutdownHook) {
2267                shutdownHook = new Thread("ActiveMQ ShutdownHook") {
2268                    @Override
2269                    public void run() {
2270                        containerShutdown();
2271                    }
2272                };
2273                Runtime.getRuntime().addShutdownHook(shutdownHook);
2274            }
2275        }
2276    
2277        protected void removeShutdownHook() {
2278            if (shutdownHook != null) {
2279                try {
2280                    Runtime.getRuntime().removeShutdownHook(shutdownHook);
2281                } catch (Exception e) {
2282                    LOG.debug("Caught exception, must be shutting down: " + e);
2283                }
2284            }
2285        }
2286    
2287        /**
2288         * Sets hooks to be executed when broker shut down
2289         *
2290         * @org.apache.xbean.Property
2291         */
2292        public void setShutdownHooks(List<Runnable> hooks) throws Exception {
2293            for (Runnable hook : hooks) {
2294                addShutdownHook(hook);
2295            }
2296        }
2297    
2298        /**
2299         * Causes a clean shutdown of the container when the VM is being shut down
2300         */
2301        protected void containerShutdown() {
2302            try {
2303                stop();
2304            } catch (IOException e) {
2305                Throwable linkedException = e.getCause();
2306                if (linkedException != null) {
2307                    logError("Failed to shut down: " + e + ". Reason: " + linkedException, linkedException);
2308                } else {
2309                    logError("Failed to shut down: " + e, e);
2310                }
2311                if (!useLoggingForShutdownErrors) {
2312                    e.printStackTrace(System.err);
2313                }
2314            } catch (Exception e) {
2315                logError("Failed to shut down: " + e, e);
2316            }
2317        }
2318    
2319        protected void logError(String message, Throwable e) {
2320            if (useLoggingForShutdownErrors) {
2321                LOG.error("Failed to shut down: " + e);
2322            } else {
2323                System.err.println("Failed to shut down: " + e);
2324            }
2325        }
2326    
2327        /**
2328         * Starts any configured destinations on startup
2329         */
2330        protected void startDestinations() throws Exception {
2331            if (destinations != null) {
2332                ConnectionContext adminConnectionContext = getAdminConnectionContext();
2333                for (int i = 0; i < destinations.length; i++) {
2334                    ActiveMQDestination destination = destinations[i];
2335                    getBroker().addDestination(adminConnectionContext, destination,true);
2336                }
2337            }
2338            if (isUseVirtualTopics()) {
2339                startVirtualConsumerDestinations();
2340            }
2341        }
2342    
2343        /**
2344         * Returns the broker's administration connection context used for
2345         * configuring the broker at startup
2346         */
2347        public ConnectionContext getAdminConnectionContext() throws Exception {
2348            return BrokerSupport.getConnectionContext(getBroker());
2349        }
2350    
2351        protected void waitForSlave() {
2352            try {
2353                if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
2354                    throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds.");
2355                }
2356            } catch (InterruptedException e) {
2357                LOG.error("Exception waiting for slave:" + e);
2358            }
2359        }
2360    
2361        protected void slaveConnectionEstablished() {
2362            slaveStartSignal.countDown();
2363        }
2364    
2365        protected void startManagementContext() throws Exception {
2366            getManagementContext().setBrokerName(brokerName);
2367            getManagementContext().start();
2368            adminView = new BrokerView(this, null);
2369            ObjectName objectName = getBrokerObjectName();
2370            AnnotatedMBean.registerMBean(getManagementContext(), adminView, objectName);
2371        }
2372    
2373        /**
2374         * Start all transport and network connections, proxies and bridges
2375         *
2376         * @throws Exception
2377         */
2378        public void startAllConnectors() throws Exception {
2379            if (!isSlave()) {
2380                Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
2381                List<TransportConnector> al = new ArrayList<TransportConnector>();
2382                for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
2383                    TransportConnector connector = iter.next();
2384                    connector.setBrokerService(this);
2385                    al.add(startTransportConnector(connector));
2386                }
2387                if (al.size() > 0) {
2388                    // let's clear the transportConnectors list and replace it with
2389                    // the started transportConnector instances
2390                    this.transportConnectors.clear();
2391                    setTransportConnectors(al);
2392                }
2393                URI uri = getVmConnectorURI();
2394                Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
2395                map.put("network", "true");
2396                map.put("async", "false");
2397                uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
2398                if (isWaitForSlave()) {
2399                    waitForSlave();
2400                }
2401                if (!stopped.get()) {
2402                    ThreadPoolExecutor networkConnectorStartExecutor = null;
2403                    if (isNetworkConnectorStartAsync()) {
2404                        // spin up as many threads as needed
2405                        networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
2406                                10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
2407                                new ThreadFactory() {
2408                                    int count=0;
2409                                    public Thread newThread(Runnable runnable) {
2410                                        Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
2411                                        thread.setDaemon(true);
2412                                        return thread;
2413                                    }
2414                                });
2415                    }
2416    
2417                    for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
2418                        final NetworkConnector connector = iter.next();
2419                        connector.setLocalUri(uri);
2420                        connector.setBrokerName(getBrokerName());
2421                        connector.setDurableDestinations(durableDestinations);
2422                        if (getDefaultSocketURIString() != null) {
2423                            connector.setBrokerURL(getDefaultSocketURIString());
2424                        }
2425                        if (networkConnectorStartExecutor != null) {
2426                            networkConnectorStartExecutor.execute(new Runnable() {
2427                                public void run() {
2428                                    try {
2429                                        LOG.info("Async start of " + connector);
2430                                        connector.start();
2431                                    } catch(Exception e) {
2432                                        LOG.error("Async start of network connector: " + connector + " failed", e);
2433                                    }
2434                                }
2435                            });
2436                        } else {
2437                            connector.start();
2438                        }
2439                    }
2440                    if (networkConnectorStartExecutor != null) {
2441                        // executor done when enqueued tasks are complete
2442                        ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
2443                    }
2444    
2445                    for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
2446                        ProxyConnector connector = iter.next();
2447                        connector.start();
2448                    }
2449                    for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
2450                        JmsConnector connector = iter.next();
2451                        connector.start();
2452                    }
2453                    for (Service service : services) {
2454                        configureService(service);
2455                        service.start();
2456                    }
2457                }
2458            }
2459        }
2460    
2461        protected TransportConnector startTransportConnector(TransportConnector connector) throws Exception {
2462            connector.setTaskRunnerFactory(getTaskRunnerFactory());
2463            MessageAuthorizationPolicy policy = getMessageAuthorizationPolicy();
2464            if (policy != null) {
2465                connector.setMessageAuthorizationPolicy(policy);
2466            }
2467            if (isUseJmx()) {
2468                connector = registerConnectorMBean(connector);
2469            }
2470            connector.getStatistics().setEnabled(enableStatistics);
2471            connector.start();
2472            return connector;
2473        }
2474    
2475        /**
2476         * Perform any custom dependency injection
2477         */
2478        protected void configureServices(Object[] services) {
2479            for (Object service : services) {
2480                configureService(service);
2481            }
2482        }
2483    
2484        /**
2485         * Perform any custom dependency injection
2486         */
2487        protected void configureService(Object service) {
2488            if (service instanceof BrokerServiceAware) {
2489                BrokerServiceAware serviceAware = (BrokerServiceAware) service;
2490                serviceAware.setBrokerService(this);
2491            }
2492            if (masterConnector == null) {
2493                if (service instanceof MasterConnector) {
2494                    masterConnector = (MasterConnector) service;
2495                    supportFailOver = true;
2496                }
2497            }
2498        }
2499    
2500        public void handleIOException(IOException exception) {
2501            if (ioExceptionHandler != null) {
2502                ioExceptionHandler.handle(exception);
2503             } else {
2504                LOG.info("No IOExceptionHandler registered, ignoring IO exception, " + exception, exception);
2505             }
2506        }
2507    
2508        protected void startVirtualConsumerDestinations() throws Exception {
2509            ConnectionContext adminConnectionContext = getAdminConnectionContext();
2510            Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
2511            DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
2512            if (!destinations.isEmpty()) {
2513                for (ActiveMQDestination destination : destinations) {
2514                    if (filter.matches(destination) == true) {
2515                        broker.addDestination(adminConnectionContext, destination, false);
2516                    }
2517                }
2518            }
2519        }
2520    
2521        private DestinationFilter getVirtualTopicConsumerDestinationFilter() {
2522            // created at startup, so no sync needed
2523            if (virtualConsumerDestinationFilter == null) {
2524                Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>();
2525                if (destinationInterceptors != null) {
2526                    for (DestinationInterceptor interceptor : destinationInterceptors) {
2527                        if (interceptor instanceof VirtualDestinationInterceptor) {
2528                            VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor;
2529                            for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) {
2530                                if (virtualDestination instanceof VirtualTopic) {
2531                                    consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
2532                                }
2533                            }
2534                        }
2535                    }
2536                }
2537                ActiveMQQueue filter = new ActiveMQQueue();
2538                filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{}));
2539                virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter);
2540            }
2541            return virtualConsumerDestinationFilter;
2542        }
2543    
2544        protected synchronized ThreadPoolExecutor getExecutor() {
2545            if (this.executor == null) {
2546                this.executor = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
2547    
2548                    private long i = 0;
2549    
2550                    @Override
2551                    public Thread newThread(Runnable runnable) {
2552                        this.i++;
2553                        Thread thread = new Thread(runnable, "ActiveMQ BrokerService.worker." + this.i);
2554                        thread.setDaemon(true);
2555                        thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
2556                            @Override
2557                            public void uncaughtException(final Thread t, final Throwable e) {
2558                                LOG.error("Error in thread '{}'", t.getName(), e);
2559                            }
2560                        });
2561                        return thread;
2562                    }
2563                }, new RejectedExecutionHandler() {
2564                    @Override
2565                    public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
2566                        try {
2567                            executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
2568                        } catch (InterruptedException e) {
2569                            throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
2570                        }
2571    
2572                        throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
2573                    }
2574                });
2575            }
2576            return this.executor;
2577        }
2578    
2579        public synchronized Scheduler getScheduler() {
2580            if (this.scheduler==null) {
2581                this.scheduler = new Scheduler("ActiveMQ Broker["+getBrokerName()+"] Scheduler");
2582                try {
2583                    this.scheduler.start();
2584                } catch (Exception e) {
2585                   LOG.error("Failed to start Scheduler ",e);
2586                }
2587            }
2588            return this.scheduler;
2589        }
2590    
2591        public Broker getRegionBroker() {
2592            return regionBroker;
2593        }
2594    
2595        public void setRegionBroker(Broker regionBroker) {
2596            this.regionBroker = regionBroker;
2597        }
2598    
2599        public void addShutdownHook(Runnable hook) {
2600            synchronized (shutdownHooks) {
2601                shutdownHooks.add(hook);
2602            }
2603        }
2604    
2605        public void removeShutdownHook(Runnable hook) {
2606            synchronized (shutdownHooks) {
2607                shutdownHooks.remove(hook);
2608            }
2609        }
2610    
2611        public boolean isSystemExitOnShutdown() {
2612            return systemExitOnShutdown;
2613        }
2614    
2615        /**
2616         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2617         */
2618        public void setSystemExitOnShutdown(boolean systemExitOnShutdown) {
2619            this.systemExitOnShutdown = systemExitOnShutdown;
2620        }
2621    
2622        public int getSystemExitOnShutdownExitCode() {
2623            return systemExitOnShutdownExitCode;
2624        }
2625    
2626        public void setSystemExitOnShutdownExitCode(int systemExitOnShutdownExitCode) {
2627            this.systemExitOnShutdownExitCode = systemExitOnShutdownExitCode;
2628        }
2629    
2630        public SslContext getSslContext() {
2631            return sslContext;
2632        }
2633    
2634        public void setSslContext(SslContext sslContext) {
2635            this.sslContext = sslContext;
2636        }
2637    
2638        public boolean isShutdownOnSlaveFailure() {
2639            return shutdownOnSlaveFailure;
2640        }
2641    
2642        /**
2643         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2644         */
2645        public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
2646            this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
2647        }
2648    
2649        public boolean isWaitForSlave() {
2650            return waitForSlave;
2651        }
2652    
2653        /**
2654         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2655         */
2656        public void setWaitForSlave(boolean waitForSlave) {
2657            this.waitForSlave = waitForSlave;
2658        }
2659    
2660        public long getWaitForSlaveTimeout() {
2661            return this.waitForSlaveTimeout;
2662        }
2663    
2664        public void setWaitForSlaveTimeout(long waitForSlaveTimeout) {
2665            this.waitForSlaveTimeout = waitForSlaveTimeout;
2666        }
2667    
2668        public CountDownLatch getSlaveStartSignal() {
2669            return slaveStartSignal;
2670        }
2671    
2672        /**
2673         * Get the passiveSlave
2674         * @return the passiveSlave
2675         */
2676        public boolean isPassiveSlave() {
2677            return this.passiveSlave;
2678        }
2679    
2680        /**
2681         * Set the passiveSlave
2682         * @param passiveSlave the passiveSlave to set
2683         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2684         */
2685        public void setPassiveSlave(boolean passiveSlave) {
2686            this.passiveSlave = passiveSlave;
2687        }
2688    
2689        /**
2690         * override the Default IOException handler, called when persistence adapter
2691         * has experiences File or JDBC I/O Exceptions
2692         *
2693         * @param ioExceptionHandler
2694         */
2695        public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
2696            configureService(ioExceptionHandler);
2697            this.ioExceptionHandler = ioExceptionHandler;
2698        }
2699    
2700        public IOExceptionHandler getIoExceptionHandler() {
2701            return ioExceptionHandler;
2702        }
2703    
2704        /**
2705         * @return the schedulerSupport
2706         */
2707        public boolean isSchedulerSupport() {
2708            return this.schedulerSupport;
2709        }
2710    
2711        /**
2712         * @param schedulerSupport the schedulerSupport to set
2713         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
2714         */
2715        public void setSchedulerSupport(boolean schedulerSupport) {
2716            this.schedulerSupport = schedulerSupport;
2717        }
2718    
2719        /**
2720         * @return the schedulerDirectory
2721         */
2722        public File getSchedulerDirectoryFile() {
2723            if (this.schedulerDirectoryFile == null) {
2724                this.schedulerDirectoryFile = new File(getBrokerDataDirectory(), "scheduler");
2725            }
2726            return schedulerDirectoryFile;
2727        }
2728    
2729        /**
2730         * @param schedulerDirectory the schedulerDirectory to set
2731         */
2732        public void setSchedulerDirectoryFile(File schedulerDirectory) {
2733            this.schedulerDirectoryFile = schedulerDirectory;
2734        }
2735    
2736        public void setSchedulerDirectory(String schedulerDirectory) {
2737            setSchedulerDirectoryFile(new File(schedulerDirectory));
2738        }
2739    
2740        public int getSchedulePeriodForDestinationPurge() {
2741            return this.schedulePeriodForDestinationPurge;
2742        }
2743    
2744        public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
2745            this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
2746        }
2747    
2748        public int getMaxPurgedDestinationsPerSweep() {
2749            return this.maxPurgedDestinationsPerSweep;
2750        }
2751    
2752        public void setMaxPurgedDestinationsPerSweep(int maxPurgedDestinationsPerSweep) {
2753            this.maxPurgedDestinationsPerSweep = maxPurgedDestinationsPerSweep;
2754        }
2755    
2756        public BrokerContext getBrokerContext() {
2757            return brokerContext;
2758        }
2759    
2760        public void setBrokerContext(BrokerContext brokerContext) {
2761            this.brokerContext = brokerContext;
2762        }
2763    
2764        public void setBrokerId(String brokerId) {
2765            this.brokerId = new BrokerId(brokerId);
2766        }
2767    
2768        public boolean isUseAuthenticatedPrincipalForJMSXUserID() {
2769            return useAuthenticatedPrincipalForJMSXUserID;
2770        }
2771    
2772        public void setUseAuthenticatedPrincipalForJMSXUserID(boolean useAuthenticatedPrincipalForJMSXUserID) {
2773            this.useAuthenticatedPrincipalForJMSXUserID = useAuthenticatedPrincipalForJMSXUserID;
2774        }
2775    
2776        /**
2777         * Should MBeans that support showing the Authenticated User Name information have this
2778         * value filled in or not.
2779         *
2780         * @return true if user names should be exposed in MBeans
2781         */
2782        public boolean isPopulateUserNameInMBeans() {
2783            return this.populateUserNameInMBeans;
2784        }
2785    
2786        /**
2787         * Sets whether Authenticated User Name information is shown in MBeans that support this field.
2788         * @param value if MBeans should expose user name information.
2789         */
2790        public void setPopulateUserNameInMBeans(boolean value) {
2791            this.populateUserNameInMBeans = value;
2792        }
2793    
2794        /**
2795         * Gets the time in Milliseconds that an invocation of an MBean method will wait before
2796         * failing.  The default value is to wait forever (zero).
2797         *
2798         * @return timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
2799         */
2800        public long getMbeanInvocationTimeout() {
2801            return mbeanInvocationTimeout;
2802        }
2803    
2804        /**
2805         * Gets the time in Milliseconds that an invocation of an MBean method will wait before
2806         * failing. The default value is to wait forever (zero).
2807         *
2808         * @param mbeanInvocationTimeout
2809         *      timeout in milliseconds before MBean calls fail, (default is 0 or no timeout).
2810         */
2811        public void setMbeanInvocationTimeout(long mbeanInvocationTimeout) {
2812            this.mbeanInvocationTimeout = mbeanInvocationTimeout;
2813        }
2814    
2815        public boolean isNetworkConnectorStartAsync() {
2816            return networkConnectorStartAsync;
2817        }
2818    
2819        public void setNetworkConnectorStartAsync(boolean networkConnectorStartAsync) {
2820            this.networkConnectorStartAsync = networkConnectorStartAsync;
2821        }
2822    
2823        public boolean isAllowTempAutoCreationOnSend() {
2824            return allowTempAutoCreationOnSend;
2825        }
2826    
2827        /**
2828         * enable if temp destinations need to be propagated through a network when
2829         * advisorySupport==false. This is used in conjunction with the policy
2830         * gcInactiveDestinations for matching temps so they can get removed
2831         * when inactive
2832         *
2833         * @param allowTempAutoCreationOnSend
2834         */
2835        public void setAllowTempAutoCreationOnSend(boolean allowTempAutoCreationOnSend) {
2836            this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
2837        }
2838    
2839        public int getOfflineDurableSubscriberTimeout() {
2840            return offlineDurableSubscriberTimeout;
2841        }
2842    
2843        public void setOfflineDurableSubscriberTimeout(int offlineDurableSubscriberTimeout) {
2844            this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
2845        }
2846    
2847        public int getOfflineDurableSubscriberTaskSchedule() {
2848            return offlineDurableSubscriberTaskSchedule;
2849        }
2850    
2851        public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule) {
2852            this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
2853        }
2854    
2855        public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
2856            return isUseVirtualTopics() && destination.isQueue() &&
2857                    getVirtualTopicConsumerDestinationFilter().matches(destination);
2858        }
2859    
2860        public Throwable getStartException() {
2861            return startException;
2862        }
2863    
2864        public boolean isStartAsync() {
2865            return startAsync;
2866        }
2867    
2868        public void setStartAsync(boolean startAsync) {
2869            this.startAsync = startAsync;
2870        }
2871    }