001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.security.AccessController;
023import java.security.PrivilegedAction;
024import java.util.*;
025import java.util.concurrent.RejectedExecutionHandler;
026
027import javax.jms.Connection;
028import javax.jms.ConnectionFactory;
029import javax.jms.ExceptionListener;
030import javax.jms.JMSException;
031import javax.jms.QueueConnection;
032import javax.jms.QueueConnectionFactory;
033import javax.jms.TopicConnection;
034import javax.jms.TopicConnectionFactory;
035import javax.naming.Context;
036
037import org.apache.activemq.blob.BlobTransferPolicy;
038import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
039import org.apache.activemq.jndi.JNDIBaseStorable;
040import org.apache.activemq.management.JMSStatsImpl;
041import org.apache.activemq.management.StatsCapable;
042import org.apache.activemq.management.StatsImpl;
043import org.apache.activemq.thread.TaskRunnerFactory;
044import org.apache.activemq.transport.Transport;
045import org.apache.activemq.transport.TransportFactory;
046import org.apache.activemq.transport.TransportListener;
047import org.apache.activemq.util.*;
048import org.apache.activemq.util.URISupport.CompositeData;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * A ConnectionFactory is an an Administered object, and is used for creating
054 * Connections. <p/> This class also implements QueueConnectionFactory and
055 * TopicConnectionFactory. You can use this connection to create both
056 * QueueConnections and TopicConnections.
057 *
058 *
059 * @see javax.jms.ConnectionFactory
060 */
061public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
062    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionFactory.class);
063    private static final String DEFAULT_BROKER_HOST;
064    private static final int DEFAULT_BROKER_PORT;
065    static{
066        String host = null;
067        String port = null;
068        try {
069             host = AccessController.doPrivileged(new PrivilegedAction<String>() {
070                 @Override
071                 public String run() {
072                     String result = System.getProperty("org.apache.activemq.AMQ_HOST");
073                     result = (result==null||result.isEmpty()) ?  System.getProperty("AMQ_HOST","localhost") : result;
074                     return result;
075                 }
076             });
077             port = AccessController.doPrivileged(new PrivilegedAction<String>() {
078                 @Override
079                 public String run() {
080                     String result = System.getProperty("org.apache.activemq.AMQ_PORT");
081                     result = (result==null||result.isEmpty()) ?  System.getProperty("AMQ_PORT","61616") : result;
082                     return result;
083                 }
084             });
085        }catch(Throwable e){
086            LOG.debug("Failed to look up System properties for host and port",e);
087        }
088        host = (host == null || host.isEmpty()) ? "localhost" : host;
089        port = (port == null || port.isEmpty()) ? "61616" : port;
090        DEFAULT_BROKER_HOST = host;
091        DEFAULT_BROKER_PORT = Integer.parseInt(port);
092    }
093
094
095    public static final String DEFAULT_BROKER_BIND_URL;
096
097    static{
098        final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
099        String bindURL = null;
100
101        try {
102            bindURL = AccessController.doPrivileged(new PrivilegedAction<String>() {
103                @Override
104                public String run() {
105                    String result = System.getProperty("org.apache.activemq.BROKER_BIND_URL");
106                    result = (result==null||result.isEmpty()) ?  System.getProperty("BROKER_BIND_URL",defaultURL) : result;
107                    return result;
108                }
109            });
110        }catch(Throwable e){
111            LOG.debug("Failed to look up System properties for host and port",e);
112        }
113        bindURL = (bindURL == null || bindURL.isEmpty()) ? defaultURL : bindURL;
114        DEFAULT_BROKER_BIND_URL = bindURL;
115    }
116
117    public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
118    public static final String DEFAULT_USER = null;
119    public static final String DEFAULT_PASSWORD = null;
120    public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
121
122    protected URI brokerURL;
123    protected String userName;
124    protected String password;
125    protected String clientID;
126    protected boolean dispatchAsync=true;
127    protected boolean alwaysSessionAsync=true;
128
129    JMSStatsImpl factoryStats = new JMSStatsImpl();
130
131    private IdGenerator clientIdGenerator;
132    private String clientIDPrefix;
133    private IdGenerator connectionIdGenerator;
134    private String connectionIDPrefix;
135
136    // client policies
137    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
138    private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
139    {
140        redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy());
141    }
142    private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
143    private MessageTransformer transformer;
144
145    private boolean disableTimeStampsByDefault;
146    private boolean optimizedMessageDispatch = true;
147    private long optimizeAcknowledgeTimeOut = 300;
148    private long optimizedAckScheduledAckInterval = 0;
149    private boolean copyMessageOnSend = true;
150    private boolean useCompression;
151    private boolean objectMessageSerializationDefered;
152    private boolean useAsyncSend;
153    private boolean optimizeAcknowledge;
154    private int closeTimeout = 15000;
155    private boolean useRetroactiveConsumer;
156    private boolean exclusiveConsumer;
157    private boolean nestedMapAndListEnabled = true;
158    private boolean alwaysSyncSend;
159    private boolean watchTopicAdvisories = true;
160    private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
161    private long warnAboutUnstartedConnectionTimeout = 500L;
162    private int sendTimeout = 0;
163    private boolean sendAcksAsync=true;
164    private TransportListener transportListener;
165    private ExceptionListener exceptionListener;
166    private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
167    private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
168    private boolean useDedicatedTaskRunner;
169    private long consumerFailoverRedeliveryWaitPeriod = 0;
170    private boolean checkForDuplicates = true;
171    private ClientInternalExceptionListener clientInternalExceptionListener;
172    private boolean messagePrioritySupported = false;
173    private boolean transactedIndividualAck = false;
174    private boolean nonBlockingRedelivery = false;
175    private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
176    private TaskRunnerFactory sessionTaskRunner;
177    private RejectedExecutionHandler rejectedTaskHandler = null;
178    protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class
179    private boolean rmIdFromConnectionId = false;
180    private boolean consumerExpiryCheckEnabled = true;
181    private List<String> trustedPackages = Arrays.asList(ClassLoadingAwareObjectInputStream.serializablePackages);
182    private boolean trustAllPackages = false;
183
184    // /////////////////////////////////////////////
185    //
186    // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
187    //
188    // /////////////////////////////////////////////
189
190    public ActiveMQConnectionFactory() {
191        this(DEFAULT_BROKER_URL);
192    }
193
194    public ActiveMQConnectionFactory(String brokerURL) {
195        this(createURI(brokerURL));
196    }
197
198    public ActiveMQConnectionFactory(URI brokerURL) {
199        setBrokerURL(brokerURL.toString());
200    }
201
202    public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
203        setUserName(userName);
204        setPassword(password);
205        setBrokerURL(brokerURL.toString());
206    }
207
208    public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
209        setUserName(userName);
210        setPassword(password);
211        setBrokerURL(brokerURL);
212    }
213
214    /**
215     * Returns a copy of the given connection factory
216     */
217    public ActiveMQConnectionFactory copy() {
218        try {
219            return (ActiveMQConnectionFactory)super.clone();
220        } catch (CloneNotSupportedException e) {
221            throw new RuntimeException("This should never happen: " + e, e);
222        }
223    }
224
225    /*boolean*
226     * @param brokerURL
227     * @return
228     * @throws URISyntaxException
229     */
230    private static URI createURI(String brokerURL) {
231        try {
232            return new URI(brokerURL);
233        } catch (URISyntaxException e) {
234            throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
235        }
236    }
237
238    /**
239     * @return Returns the Connection.
240     */
241    @Override
242    public Connection createConnection() throws JMSException {
243        return createActiveMQConnection();
244    }
245
246    /**
247     * @return Returns the Connection.
248     */
249    @Override
250    public Connection createConnection(String userName, String password) throws JMSException {
251        return createActiveMQConnection(userName, password);
252    }
253
254    /**
255     * @return Returns the QueueConnection.
256     * @throws JMSException
257     */
258    @Override
259    public QueueConnection createQueueConnection() throws JMSException {
260        return createActiveMQConnection().enforceQueueOnlyConnection();
261    }
262
263    /**
264     * @return Returns the QueueConnection.
265     */
266    @Override
267    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
268        return createActiveMQConnection(userName, password).enforceQueueOnlyConnection();
269    }
270
271    /**
272     * @return Returns the TopicConnection.
273     * @throws JMSException
274     */
275    @Override
276    public TopicConnection createTopicConnection() throws JMSException {
277        return createActiveMQConnection();
278    }
279
280    /**
281     * @return Returns the TopicConnection.
282     */
283    @Override
284    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
285        return createActiveMQConnection(userName, password);
286    }
287
288    /**
289     * @return the StatsImpl associated with this ConnectionFactory.
290     */
291    @Override
292    public StatsImpl getStats() {
293        return this.factoryStats;
294    }
295
296    // /////////////////////////////////////////////
297    //
298    // Implementation methods.
299    //
300    // /////////////////////////////////////////////
301
302    protected ActiveMQConnection createActiveMQConnection() throws JMSException {
303        return createActiveMQConnection(userName, password);
304    }
305
306    /**
307     * Creates a Transport based on this object's connection settings. Separated
308     * from createActiveMQConnection to allow for subclasses to override.
309     *
310     * @return The newly created Transport.
311     * @throws JMSException If unable to create trasnport.
312     */
313    protected Transport createTransport() throws JMSException {
314        try {
315            URI connectBrokerUL = brokerURL;
316            String scheme = brokerURL.getScheme();
317            if (scheme == null) {
318                throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
319            }
320            if (scheme.equals("auto")) {
321                connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
322            } else if (scheme.equals("auto+ssl")) {
323                connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
324            } else if (scheme.equals("auto+nio")) {
325                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
326            } else if (scheme.equals("auto+nio+ssl")) {
327                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
328            }
329
330            return TransportFactory.connect(connectBrokerUL);
331        } catch (Exception e) {
332            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
333        }
334    }
335
336    /**
337     * @return Returns the Connection.
338     */
339    protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
340        if (brokerURL == null) {
341            throw new ConfigurationException("brokerURL not set.");
342        }
343        ActiveMQConnection connection = null;
344        try {
345            Transport transport = createTransport();
346            connection = createActiveMQConnection(transport, factoryStats);
347
348            connection.setUserName(userName);
349            connection.setPassword(password);
350
351            configureConnection(connection);
352
353            transport.start();
354
355            if (clientID != null) {
356                connection.setDefaultClientID(clientID);
357            }
358
359            return connection;
360        } catch (JMSException e) {
361            // Clean up!
362            try {
363                connection.close();
364            } catch (Throwable ignore) {
365            }
366            throw e;
367        } catch (Exception e) {
368            // Clean up!
369            try {
370                connection.close();
371            } catch (Throwable ignore) {
372            }
373            throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
374        }
375    }
376
377    protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
378        ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
379                getConnectionIdGenerator(), stats);
380        return connection;
381    }
382
383    protected void configureConnection(ActiveMQConnection connection) throws JMSException {
384        connection.setPrefetchPolicy(getPrefetchPolicy());
385        connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
386        connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
387        connection.setCopyMessageOnSend(isCopyMessageOnSend());
388        connection.setUseCompression(isUseCompression());
389        connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
390        connection.setDispatchAsync(isDispatchAsync());
391        connection.setUseAsyncSend(isUseAsyncSend());
392        connection.setAlwaysSyncSend(isAlwaysSyncSend());
393        connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
394        connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
395        connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
396        connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval());
397        connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
398        connection.setExclusiveConsumer(isExclusiveConsumer());
399        connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
400        connection.setTransformer(getTransformer());
401        connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
402        connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
403        connection.setProducerWindowSize(getProducerWindowSize());
404        connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
405        connection.setSendTimeout(getSendTimeout());
406        connection.setCloseTimeout(getCloseTimeout());
407        connection.setSendAcksAsync(isSendAcksAsync());
408        connection.setAuditDepth(getAuditDepth());
409        connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
410        connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
411        connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
412        connection.setCheckForDuplicates(isCheckForDuplicates());
413        connection.setMessagePrioritySupported(isMessagePrioritySupported());
414        connection.setTransactedIndividualAck(isTransactedIndividualAck());
415        connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
416        connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
417        connection.setSessionTaskRunner(getSessionTaskRunner());
418        connection.setRejectedTaskHandler(getRejectedTaskHandler());
419        connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
420        connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
421        connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled());
422        connection.setTrustedPackages(getTrustedPackages());
423        connection.setTrustAllPackages(isTrustAllPackages());
424        if (transportListener != null) {
425            connection.addTransportListener(transportListener);
426        }
427        if (exceptionListener != null) {
428            connection.setExceptionListener(exceptionListener);
429        }
430        if (clientInternalExceptionListener != null) {
431            connection.setClientInternalExceptionListener(clientInternalExceptionListener);
432        }
433    }
434
435    // /////////////////////////////////////////////
436    //
437    // Property Accessors
438    //
439    // /////////////////////////////////////////////
440
441    public String getBrokerURL() {
442        return brokerURL == null ? null : brokerURL.toString();
443    }
444
445    /**
446     * Sets the <a
447     * href="http://activemq.apache.org/configuring-transports.html">connection
448     * URL</a> used to connect to the ActiveMQ broker.
449     */
450    public void setBrokerURL(String brokerURL) {
451        this.brokerURL = createURI(brokerURL);
452
453        // Use all the properties prefixed with 'jms.' to set the connection
454        // factory
455        // options.
456        if (this.brokerURL.getQuery() != null) {
457            // It might be a standard URI or...
458            try {
459
460                Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery());
461                Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms.");
462                if (buildFromMap(jmsOptionsMap)) {
463                    if (!jmsOptionsMap.isEmpty()) {
464                        String msg = "There are " + jmsOptionsMap.size()
465                            + " jms options that couldn't be set on the ConnectionFactory."
466                            + " Check the options are spelled correctly."
467                            + " Unknown parameters=[" + jmsOptionsMap + "]."
468                            + " This connection factory cannot be started.";
469                        throw new IllegalArgumentException(msg);
470                    }
471
472                    this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
473                }
474
475            } catch (URISyntaxException e) {
476            }
477
478        } else {
479
480            // It might be a composite URI.
481            try {
482                CompositeData data = URISupport.parseComposite(this.brokerURL);
483                Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms.");
484                if (buildFromMap(jmsOptionsMap)) {
485                    if (!jmsOptionsMap.isEmpty()) {
486                        String msg = "There are " + jmsOptionsMap.size()
487                            + " jms options that couldn't be set on the ConnectionFactory."
488                            + " Check the options are spelled correctly."
489                            + " Unknown parameters=[" + jmsOptionsMap + "]."
490                            + " This connection factory cannot be started.";
491                        throw new IllegalArgumentException(msg);
492                    }
493
494                    this.brokerURL = data.toURI();
495                }
496            } catch (URISyntaxException e) {
497            }
498        }
499    }
500
501    public String getClientID() {
502        return clientID;
503    }
504
505    /**
506     * Sets the JMS clientID to use for the created connection. Note that this
507     * can only be used by one connection at once so generally its a better idea
508     * to set the clientID on a Connection
509     */
510    public void setClientID(String clientID) {
511        this.clientID = clientID;
512    }
513
514    public boolean isCopyMessageOnSend() {
515        return copyMessageOnSend;
516    }
517
518    /**
519     * Should a JMS message be copied to a new JMS Message object as part of the
520     * send() method in JMS. This is enabled by default to be compliant with the
521     * JMS specification. You can disable it if you do not mutate JMS messages
522     * after they are sent for a performance boost
523     */
524    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
525        this.copyMessageOnSend = copyMessageOnSend;
526    }
527
528    public boolean isDisableTimeStampsByDefault() {
529        return disableTimeStampsByDefault;
530    }
531
532    /**
533     * Sets whether or not timestamps on messages should be disabled or not. If
534     * you disable them it adds a small performance boost.
535     */
536    public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
537        this.disableTimeStampsByDefault = disableTimeStampsByDefault;
538    }
539
540    public boolean isOptimizedMessageDispatch() {
541        return optimizedMessageDispatch;
542    }
543
544    /**
545     * If this flag is set then an larger prefetch limit is used - only
546     * applicable for durable topic subscribers.
547     */
548    public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
549        this.optimizedMessageDispatch = optimizedMessageDispatch;
550    }
551
552    public String getPassword() {
553        return password;
554    }
555
556    /**
557     * Sets the JMS password used for connections created from this factory
558     */
559    public void setPassword(String password) {
560        this.password = password;
561    }
562
563    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
564        return prefetchPolicy;
565    }
566
567    /**
568     * Sets the <a
569     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
570     * policy</a> for consumers created by this connection.
571     */
572    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
573        this.prefetchPolicy = prefetchPolicy;
574    }
575
576    public boolean isUseAsyncSend() {
577        return useAsyncSend;
578    }
579
580    public BlobTransferPolicy getBlobTransferPolicy() {
581        return blobTransferPolicy;
582    }
583
584    /**
585     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
586     * OBjects) are transferred from producers to brokers to consumers
587     */
588    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
589        this.blobTransferPolicy = blobTransferPolicy;
590    }
591
592    /**
593     * Forces the use of <a
594     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
595     * adds a massive performance boost; but means that the send() method will
596     * return immediately whether the message has been sent or not which could
597     * lead to message loss.
598     */
599    public void setUseAsyncSend(boolean useAsyncSend) {
600        this.useAsyncSend = useAsyncSend;
601    }
602
603    public synchronized boolean isWatchTopicAdvisories() {
604        return watchTopicAdvisories;
605    }
606
607    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
608        this.watchTopicAdvisories = watchTopicAdvisories;
609    }
610
611    /**
612     * @return true if always sync send messages
613     */
614    public boolean isAlwaysSyncSend() {
615        return this.alwaysSyncSend;
616    }
617
618    /**
619     * Set true if always require messages to be sync sent
620     *
621     * @param alwaysSyncSend
622     */
623    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
624        this.alwaysSyncSend = alwaysSyncSend;
625    }
626
627    public String getUserName() {
628        return userName;
629    }
630
631    /**
632     * Sets the JMS userName used by connections created by this factory
633     */
634    public void setUserName(String userName) {
635        this.userName = userName;
636    }
637
638    public boolean isUseRetroactiveConsumer() {
639        return useRetroactiveConsumer;
640    }
641
642    /**
643     * Sets whether or not retroactive consumers are enabled. Retroactive
644     * consumers allow non-durable topic subscribers to receive old messages
645     * that were published before the non-durable subscriber started.
646     */
647    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
648        this.useRetroactiveConsumer = useRetroactiveConsumer;
649    }
650
651    public boolean isExclusiveConsumer() {
652        return exclusiveConsumer;
653    }
654
655    /**
656     * Enables or disables whether or not queue consumers should be exclusive or
657     * not for example to preserve ordering when not using <a
658     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
659     *
660     * @param exclusiveConsumer
661     */
662    public void setExclusiveConsumer(boolean exclusiveConsumer) {
663        this.exclusiveConsumer = exclusiveConsumer;
664    }
665
666    public RedeliveryPolicy getRedeliveryPolicy() {
667        return redeliveryPolicyMap.getDefaultEntry();
668    }
669
670    /**
671     * Sets the global default redelivery policy to be used when a message is delivered
672     * but the session is rolled back
673     */
674    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
675        this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
676    }
677
678    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
679        return this.redeliveryPolicyMap;
680    }
681
682    /**
683     * Sets the global redelivery policy mapping to be used when a message is delivered
684     * but the session is rolled back
685     */
686    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
687        this.redeliveryPolicyMap = redeliveryPolicyMap;
688    }
689
690    public MessageTransformer getTransformer() {
691        return transformer;
692    }
693
694    /**
695     * @return the sendTimeout (in milliseconds)
696     */
697    public int getSendTimeout() {
698        return sendTimeout;
699    }
700
701    /**
702     * @param sendTimeout the sendTimeout to set (in milliseconds)
703     */
704    public void setSendTimeout(int sendTimeout) {
705        this.sendTimeout = sendTimeout;
706    }
707
708    /**
709     * @return the sendAcksAsync
710     */
711    public boolean isSendAcksAsync() {
712        return sendAcksAsync;
713    }
714
715    /**
716     * @param sendAcksAsync the sendAcksAsync to set
717     */
718    public void setSendAcksAsync(boolean sendAcksAsync) {
719        this.sendAcksAsync = sendAcksAsync;
720    }
721
722    /**
723     * @return the messagePrioritySupported
724     */
725    public boolean isMessagePrioritySupported() {
726        return this.messagePrioritySupported;
727    }
728
729    /**
730     * @param messagePrioritySupported the messagePrioritySupported to set
731     */
732    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
733        this.messagePrioritySupported = messagePrioritySupported;
734    }
735
736
737    /**
738     * Sets the transformer used to transform messages before they are sent on
739     * to the JMS bus or when they are received from the bus but before they are
740     * delivered to the JMS client
741     */
742    public void setTransformer(MessageTransformer transformer) {
743        this.transformer = transformer;
744    }
745
746    @SuppressWarnings({ "unchecked", "rawtypes" })
747    @Override
748    public void buildFromProperties(Properties properties) {
749
750        if (properties == null) {
751            properties = new Properties();
752        }
753
754        String temp = properties.getProperty(Context.PROVIDER_URL);
755        if (temp == null || temp.length() == 0) {
756            temp = properties.getProperty("brokerURL");
757        }
758        if (temp != null && temp.length() > 0) {
759            setBrokerURL(temp);
760        }
761
762        Map<String, Object> p = new HashMap(properties);
763        buildFromMap(p);
764    }
765
766    public boolean buildFromMap(Map<String, Object> properties) {
767        boolean rc = false;
768
769        ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
770        if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
771            setPrefetchPolicy(p);
772            rc = true;
773        }
774
775        RedeliveryPolicy rp = new RedeliveryPolicy();
776        if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
777            setRedeliveryPolicy(rp);
778            rc = true;
779        }
780
781        BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
782        if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
783            setBlobTransferPolicy(blobTransferPolicy);
784            rc = true;
785        }
786
787        rc |= IntrospectionSupport.setProperties(this, properties);
788
789        return rc;
790    }
791
792    @Override
793    public void populateProperties(Properties props) {
794        props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
795
796        if (getBrokerURL() != null) {
797            props.setProperty(Context.PROVIDER_URL, getBrokerURL());
798            props.setProperty("brokerURL", getBrokerURL());
799        }
800
801        if (getClientID() != null) {
802            props.setProperty("clientID", getClientID());
803        }
804
805        IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
806        IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
807        IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
808
809        props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
810        props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
811        props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
812        props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
813
814        if (getPassword() != null) {
815            props.setProperty("password", getPassword());
816        }
817
818        props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
819        props.setProperty("useCompression", Boolean.toString(isUseCompression()));
820        props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
821        props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
822
823        if (getUserName() != null) {
824            props.setProperty("userName", getUserName());
825        }
826
827        props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
828        props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
829        props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
830        props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
831        props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
832        props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
833        props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
834        props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
835        props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
836        props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
837        props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
838        props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
839        props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck()));
840        props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery()));
841        props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize()));
842        props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled()));
843        props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod()));
844        props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId()));
845        props.setProperty("consumerExpiryCheckEnabled", Boolean.toString(isConsumerExpiryCheckEnabled()));
846    }
847
848    public boolean isUseCompression() {
849        return useCompression;
850    }
851
852    /**
853     * Enables the use of compression of the message bodies
854     */
855    public void setUseCompression(boolean useCompression) {
856        this.useCompression = useCompression;
857    }
858
859    public boolean isObjectMessageSerializationDefered() {
860        return objectMessageSerializationDefered;
861    }
862
863    /**
864     * When an object is set on an ObjectMessage, the JMS spec requires the
865     * object to be serialized by that set method. Enabling this flag causes the
866     * object to not get serialized. The object may subsequently get serialized
867     * if the message needs to be sent over a socket or stored to disk.
868     */
869    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
870        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
871    }
872
873    public boolean isDispatchAsync() {
874        return dispatchAsync;
875    }
876
877    /**
878     * Enables or disables the default setting of whether or not consumers have
879     * their messages <a
880     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
881     * synchronously or asynchronously by the broker</a>. For non-durable
882     * topics for example we typically dispatch synchronously by default to
883     * minimize context switches which boost performance. However sometimes its
884     * better to go slower to ensure that a single blocked consumer socket does
885     * not block delivery to other consumers.
886     *
887     * @param asyncDispatch If true then consumers created on this connection
888     *                will default to having their messages dispatched
889     *                asynchronously. The default value is true.
890     */
891    public void setDispatchAsync(boolean asyncDispatch) {
892        this.dispatchAsync = asyncDispatch;
893    }
894
895    /**
896     * @return Returns the closeTimeout.
897     */
898    public int getCloseTimeout() {
899        return closeTimeout;
900    }
901
902    /**
903     * Sets the timeout before a close is considered complete. Normally a
904     * close() on a connection waits for confirmation from the broker; this
905     * allows that operation to timeout to save the client hanging if there is
906     * no broker
907     */
908    public void setCloseTimeout(int closeTimeout) {
909        this.closeTimeout = closeTimeout;
910    }
911
912    /**
913     * @return Returns the alwaysSessionAsync.
914     */
915    public boolean isAlwaysSessionAsync() {
916        return alwaysSessionAsync;
917    }
918
919    /**
920     * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
921     * the Connection. However, a separate thread is always used if there is more than one session, or the session
922     * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
923     * happens asynchronously.
924     */
925    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
926        this.alwaysSessionAsync = alwaysSessionAsync;
927    }
928
929    /**
930     * @return Returns the optimizeAcknowledge.
931     */
932    public boolean isOptimizeAcknowledge() {
933        return optimizeAcknowledge;
934    }
935
936    /**
937     * @param optimizeAcknowledge The optimizeAcknowledge to set.
938     */
939    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
940        this.optimizeAcknowledge = optimizeAcknowledge;
941    }
942
943    /**
944     * The max time in milliseconds between optimized ack batches
945     * @param optimizeAcknowledgeTimeOut
946     */
947    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
948        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
949    }
950
951    public long getOptimizeAcknowledgeTimeOut() {
952        return optimizeAcknowledgeTimeOut;
953    }
954
955    public boolean isNestedMapAndListEnabled() {
956        return nestedMapAndListEnabled;
957    }
958
959    /**
960     * Enables/disables whether or not Message properties and MapMessage entries
961     * support <a
962     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
963     * Structures</a> of Map and List objects
964     */
965    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
966        this.nestedMapAndListEnabled = structuredMapsEnabled;
967    }
968
969    public String getClientIDPrefix() {
970        return clientIDPrefix;
971    }
972
973    /**
974     * Sets the prefix used by autogenerated JMS Client ID values which are used
975     * if the JMS client does not explicitly specify on.
976     *
977     * @param clientIDPrefix
978     */
979    public void setClientIDPrefix(String clientIDPrefix) {
980        this.clientIDPrefix = clientIDPrefix;
981    }
982
983    protected synchronized IdGenerator getClientIdGenerator() {
984        if (clientIdGenerator == null) {
985            if (clientIDPrefix != null) {
986                clientIdGenerator = new IdGenerator(clientIDPrefix);
987            } else {
988                clientIdGenerator = new IdGenerator();
989            }
990        }
991        return clientIdGenerator;
992    }
993
994    protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
995        this.clientIdGenerator = clientIdGenerator;
996    }
997
998    /**
999     * Sets the prefix used by connection id generator
1000     * @param connectionIDPrefix
1001     */
1002    public void setConnectionIDPrefix(String connectionIDPrefix) {
1003        this.connectionIDPrefix = connectionIDPrefix;
1004    }
1005
1006    protected synchronized IdGenerator getConnectionIdGenerator() {
1007        if (connectionIdGenerator == null) {
1008            if (connectionIDPrefix != null) {
1009                connectionIdGenerator = new IdGenerator(connectionIDPrefix);
1010            } else {
1011                connectionIdGenerator = new IdGenerator();
1012            }
1013        }
1014        return connectionIdGenerator;
1015    }
1016
1017    protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
1018        this.connectionIdGenerator = connectionIdGenerator;
1019    }
1020
1021    /**
1022     * @return the statsEnabled
1023     */
1024    public boolean isStatsEnabled() {
1025        return this.factoryStats.isEnabled();
1026    }
1027
1028    /**
1029     * @param statsEnabled the statsEnabled to set
1030     */
1031    public void setStatsEnabled(boolean statsEnabled) {
1032        this.factoryStats.setEnabled(statsEnabled);
1033    }
1034
1035    public synchronized int getProducerWindowSize() {
1036        return producerWindowSize;
1037    }
1038
1039    public synchronized void setProducerWindowSize(int producerWindowSize) {
1040        this.producerWindowSize = producerWindowSize;
1041    }
1042
1043    public long getWarnAboutUnstartedConnectionTimeout() {
1044        return warnAboutUnstartedConnectionTimeout;
1045    }
1046
1047    /**
1048     * Enables the timeout from a connection creation to when a warning is
1049     * generated if the connection is not properly started via
1050     * {@link Connection#start()} and a message is received by a consumer. It is
1051     * a very common gotcha to forget to <a
1052     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1053     * the connection</a> so this option makes the default case to create a
1054     * warning if the user forgets. To disable the warning just set the value to <
1055     * 0 (say -1).
1056     */
1057    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1058        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1059    }
1060
1061    public TransportListener getTransportListener() {
1062        return transportListener;
1063    }
1064
1065    /**
1066     * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
1067     * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
1068     * a transport listener.
1069     *
1070     * @param transportListener sets the listener to be registered on all connections
1071     * created by this factory
1072     */
1073    public void setTransportListener(TransportListener transportListener) {
1074        this.transportListener = transportListener;
1075    }
1076
1077
1078    public ExceptionListener getExceptionListener() {
1079        return exceptionListener;
1080    }
1081
1082    /**
1083     * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
1084     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1085     * an exception listener.
1086     * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
1087     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1088     * @param exceptionListener sets the exception listener to be registered on all connections
1089     * created by this factory
1090     */
1091    public void setExceptionListener(ExceptionListener exceptionListener) {
1092        this.exceptionListener = exceptionListener;
1093    }
1094
1095    public int getAuditDepth() {
1096        return auditDepth;
1097    }
1098
1099    public void setAuditDepth(int auditDepth) {
1100        this.auditDepth = auditDepth;
1101    }
1102
1103    public int getAuditMaximumProducerNumber() {
1104        return auditMaximumProducerNumber;
1105    }
1106
1107    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
1108        this.auditMaximumProducerNumber = auditMaximumProducerNumber;
1109    }
1110
1111    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1112        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1113    }
1114
1115    public boolean isUseDedicatedTaskRunner() {
1116        return useDedicatedTaskRunner;
1117    }
1118
1119    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
1120        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
1121    }
1122
1123    public long getConsumerFailoverRedeliveryWaitPeriod() {
1124        return consumerFailoverRedeliveryWaitPeriod;
1125    }
1126
1127    public ClientInternalExceptionListener getClientInternalExceptionListener() {
1128        return clientInternalExceptionListener;
1129    }
1130
1131    /**
1132     * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
1133     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1134     * an exception listener.
1135     * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
1136     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1137     * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
1138     * created by this factory
1139     */
1140    public void setClientInternalExceptionListener(
1141            ClientInternalExceptionListener clientInternalExceptionListener) {
1142        this.clientInternalExceptionListener = clientInternalExceptionListener;
1143    }
1144
1145    /**
1146     * @return the checkForDuplicates
1147     */
1148    public boolean isCheckForDuplicates() {
1149        return this.checkForDuplicates;
1150    }
1151
1152    /**
1153     * @param checkForDuplicates the checkForDuplicates to set
1154     */
1155    public void setCheckForDuplicates(boolean checkForDuplicates) {
1156        this.checkForDuplicates = checkForDuplicates;
1157    }
1158
1159    public boolean isTransactedIndividualAck() {
1160         return transactedIndividualAck;
1161     }
1162
1163     /**
1164      * when true, submit individual transacted acks immediately rather than with transaction completion.
1165      * This allows the acks to represent delivery status which can be persisted on rollback
1166      * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean)  true
1167      */
1168     public void setTransactedIndividualAck(boolean transactedIndividualAck) {
1169         this.transactedIndividualAck = transactedIndividualAck;
1170     }
1171
1172
1173     public boolean isNonBlockingRedelivery() {
1174         return nonBlockingRedelivery;
1175     }
1176
1177     /**
1178      * When true a MessageConsumer will not stop Message delivery before re-delivering Messages
1179      * from a rolled back transaction.  This implies that message order will not be preserved and
1180      * also will result in the TransactedIndividualAck option to be enabled.
1181      */
1182     public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
1183         this.nonBlockingRedelivery = nonBlockingRedelivery;
1184     }
1185
1186    public int getMaxThreadPoolSize() {
1187        return maxThreadPoolSize;
1188    }
1189
1190    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
1191        this.maxThreadPoolSize = maxThreadPoolSize;
1192    }
1193
1194    public TaskRunnerFactory getSessionTaskRunner() {
1195        return sessionTaskRunner;
1196    }
1197
1198    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1199        this.sessionTaskRunner = sessionTaskRunner;
1200    }
1201
1202    public RejectedExecutionHandler getRejectedTaskHandler() {
1203        return rejectedTaskHandler;
1204    }
1205
1206    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
1207        this.rejectedTaskHandler = rejectedTaskHandler;
1208    }
1209
1210    /**
1211     * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
1212     * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
1213     * will not do any background Message acknowledgment.
1214     *
1215     * @return the scheduledOptimizedAckInterval
1216     */
1217    public long getOptimizedAckScheduledAckInterval() {
1218        return optimizedAckScheduledAckInterval;
1219    }
1220
1221    /**
1222     * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
1223     * have been configured with optimizeAcknowledge enabled.
1224     *
1225     * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
1226     */
1227    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
1228        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
1229    }
1230
1231
1232    public boolean isRmIdFromConnectionId() {
1233        return rmIdFromConnectionId;
1234    }
1235
1236    /**
1237     * uses the connection id as the resource identity for XAResource.isSameRM
1238     * ensuring join will only occur on a single connection
1239     */
1240    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
1241        this.rmIdFromConnectionId = rmIdFromConnectionId;
1242    }
1243
1244    /**
1245     * @return true if MessageConsumer instance will check for expired messages before dispatch.
1246     */
1247    public boolean isConsumerExpiryCheckEnabled() {
1248        return consumerExpiryCheckEnabled;
1249    }
1250
1251    /**
1252     * Controls whether message expiration checking is done in each MessageConsumer
1253     * prior to dispatching a message.  Disabling this check can lead to consumption
1254     * of expired messages.
1255     *
1256     * @param consumerExpiryCheckEnabled
1257     *        controls whether expiration checking is done prior to dispatch.
1258     */
1259    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
1260        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
1261    }
1262
1263    public List<String> getTrustedPackages() {
1264        return trustedPackages;
1265    }
1266
1267    public void setTrustedPackages(List<String> trustedPackages) {
1268        this.trustedPackages = trustedPackages;
1269    }
1270
1271    public boolean isTrustAllPackages() {
1272        return trustAllPackages;
1273    }
1274
1275    public void setTrustAllPackages(boolean trustAllPackages) {
1276        this.trustAllPackages = trustAllPackages;
1277    }
1278}