001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.security.AccessController;
023import java.security.PrivilegedAction;
024import java.util.*;
025import java.util.concurrent.RejectedExecutionHandler;
026
027import javax.jms.Connection;
028import javax.jms.ConnectionFactory;
029import javax.jms.ExceptionListener;
030import javax.jms.JMSException;
031import javax.jms.QueueConnection;
032import javax.jms.QueueConnectionFactory;
033import javax.jms.TopicConnection;
034import javax.jms.TopicConnectionFactory;
035import javax.naming.Context;
036
037import org.apache.activemq.blob.BlobTransferPolicy;
038import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
039import org.apache.activemq.jndi.JNDIBaseStorable;
040import org.apache.activemq.management.JMSStatsImpl;
041import org.apache.activemq.management.StatsCapable;
042import org.apache.activemq.management.StatsImpl;
043import org.apache.activemq.thread.TaskRunnerFactory;
044import org.apache.activemq.transport.Transport;
045import org.apache.activemq.transport.TransportFactory;
046import org.apache.activemq.transport.TransportListener;
047import org.apache.activemq.util.*;
048import org.apache.activemq.util.URISupport.CompositeData;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * A ConnectionFactory is an an Administered object, and is used for creating
054 * Connections. <p/> This class also implements QueueConnectionFactory and
055 * TopicConnectionFactory. You can use this connection to create both
056 * QueueConnections and TopicConnections.
057 *
058 *
059 * @see javax.jms.ConnectionFactory
060 */
061public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
062    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionFactory.class);
063    private static final String DEFAULT_BROKER_HOST;
064    private static final int DEFAULT_BROKER_PORT;
065    static{
066        String host = null;
067        String port = null;
068        try {
069             host = AccessController.doPrivileged(new PrivilegedAction<String>() {
070                 @Override
071                 public String run() {
072                     String result = System.getProperty("org.apache.activemq.AMQ_HOST");
073                     result = (result==null||result.isEmpty()) ?  System.getProperty("AMQ_HOST","localhost") : result;
074                     return result;
075                 }
076             });
077             port = AccessController.doPrivileged(new PrivilegedAction<String>() {
078                 @Override
079                 public String run() {
080                     String result = System.getProperty("org.apache.activemq.AMQ_PORT");
081                     result = (result==null||result.isEmpty()) ?  System.getProperty("AMQ_PORT","61616") : result;
082                     return result;
083                 }
084             });
085        }catch(Throwable e){
086            LOG.debug("Failed to look up System properties for host and port",e);
087        }
088        host = (host == null || host.isEmpty()) ? "localhost" : host;
089        port = (port == null || port.isEmpty()) ? "61616" : port;
090        DEFAULT_BROKER_HOST = host;
091        DEFAULT_BROKER_PORT = Integer.parseInt(port);
092    }
093
094
095    public static final String DEFAULT_BROKER_BIND_URL;
096
097    static{
098        final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
099        String bindURL = null;
100
101        try {
102            bindURL = AccessController.doPrivileged(new PrivilegedAction<String>() {
103                @Override
104                public String run() {
105                    String result = System.getProperty("org.apache.activemq.BROKER_BIND_URL");
106                    result = (result==null||result.isEmpty()) ?  System.getProperty("BROKER_BIND_URL",defaultURL) : result;
107                    return result;
108                }
109            });
110        }catch(Throwable e){
111            LOG.debug("Failed to look up System properties for host and port",e);
112        }
113        bindURL = (bindURL == null || bindURL.isEmpty()) ? defaultURL : bindURL;
114        DEFAULT_BROKER_BIND_URL = bindURL;
115    }
116
117    public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
118    public static final String DEFAULT_USER = null;
119    public static final String DEFAULT_PASSWORD = null;
120    public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
121
122    protected URI brokerURL;
123    protected String userName;
124    protected String password;
125    protected String clientID;
126    protected boolean dispatchAsync=true;
127    protected boolean alwaysSessionAsync=true;
128
129    JMSStatsImpl factoryStats = new JMSStatsImpl();
130
131    private IdGenerator clientIdGenerator;
132    private String clientIDPrefix;
133    private IdGenerator connectionIdGenerator;
134    private String connectionIDPrefix;
135
136    // client policies
137    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
138    private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
139    {
140        redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy());
141    }
142    private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
143    private MessageTransformer transformer;
144
145    private boolean disableTimeStampsByDefault;
146    private boolean optimizedMessageDispatch = true;
147    private long optimizeAcknowledgeTimeOut = 300;
148    private long optimizedAckScheduledAckInterval = 0;
149    private boolean copyMessageOnSend = true;
150    private boolean useCompression;
151    private boolean objectMessageSerializationDefered;
152    private boolean useAsyncSend;
153    private boolean optimizeAcknowledge;
154    private int closeTimeout = 15000;
155    private boolean useRetroactiveConsumer;
156    private boolean exclusiveConsumer;
157    private boolean nestedMapAndListEnabled = true;
158    private boolean alwaysSyncSend;
159    private boolean watchTopicAdvisories = true;
160    private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
161    private long warnAboutUnstartedConnectionTimeout = 500L;
162    private int sendTimeout = 0;
163    private int connectResponseTimeout = 0;
164    private boolean sendAcksAsync=true;
165    private TransportListener transportListener;
166    private ExceptionListener exceptionListener;
167    private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
168    private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
169    private boolean useDedicatedTaskRunner;
170    private long consumerFailoverRedeliveryWaitPeriod = 0;
171    private boolean checkForDuplicates = true;
172    private ClientInternalExceptionListener clientInternalExceptionListener;
173    private boolean messagePrioritySupported = false;
174    private boolean transactedIndividualAck = false;
175    private boolean nonBlockingRedelivery = false;
176    private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
177    private TaskRunnerFactory sessionTaskRunner;
178    private RejectedExecutionHandler rejectedTaskHandler = null;
179    protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class
180    private boolean rmIdFromConnectionId = false;
181    private boolean consumerExpiryCheckEnabled = true;
182    private List<String> trustedPackages = Arrays.asList(ClassLoadingAwareObjectInputStream.serializablePackages);
183    private boolean trustAllPackages = false;
184
185    // /////////////////////////////////////////////
186    //
187    // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
188    //
189    // /////////////////////////////////////////////
190
191    public ActiveMQConnectionFactory() {
192        this(DEFAULT_BROKER_URL);
193    }
194
195    public ActiveMQConnectionFactory(String brokerURL) {
196        this(createURI(brokerURL));
197    }
198
199    public ActiveMQConnectionFactory(URI brokerURL) {
200        setBrokerURL(brokerURL.toString());
201    }
202
203    public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
204        setUserName(userName);
205        setPassword(password);
206        setBrokerURL(brokerURL.toString());
207    }
208
209    public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
210        setUserName(userName);
211        setPassword(password);
212        setBrokerURL(brokerURL);
213    }
214
215    /**
216     * Returns a copy of the given connection factory
217     */
218    public ActiveMQConnectionFactory copy() {
219        try {
220            return (ActiveMQConnectionFactory)super.clone();
221        } catch (CloneNotSupportedException e) {
222            throw new RuntimeException("This should never happen: " + e, e);
223        }
224    }
225
226    /*boolean*
227     * @param brokerURL
228     * @return
229     * @throws URISyntaxException
230     */
231    private static URI createURI(String brokerURL) {
232        try {
233            return new URI(brokerURL);
234        } catch (URISyntaxException e) {
235            throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
236        }
237    }
238
239    /**
240     * @return Returns the Connection.
241     */
242    @Override
243    public Connection createConnection() throws JMSException {
244        return createActiveMQConnection();
245    }
246
247    /**
248     * @return Returns the Connection.
249     */
250    @Override
251    public Connection createConnection(String userName, String password) throws JMSException {
252        return createActiveMQConnection(userName, password);
253    }
254
255    /**
256     * @return Returns the QueueConnection.
257     * @throws JMSException
258     */
259    @Override
260    public QueueConnection createQueueConnection() throws JMSException {
261        return createActiveMQConnection().enforceQueueOnlyConnection();
262    }
263
264    /**
265     * @return Returns the QueueConnection.
266     */
267    @Override
268    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
269        return createActiveMQConnection(userName, password).enforceQueueOnlyConnection();
270    }
271
272    /**
273     * @return Returns the TopicConnection.
274     * @throws JMSException
275     */
276    @Override
277    public TopicConnection createTopicConnection() throws JMSException {
278        return createActiveMQConnection();
279    }
280
281    /**
282     * @return Returns the TopicConnection.
283     */
284    @Override
285    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
286        return createActiveMQConnection(userName, password);
287    }
288
289    /**
290     * @return the StatsImpl associated with this ConnectionFactory.
291     */
292    @Override
293    public StatsImpl getStats() {
294        return this.factoryStats;
295    }
296
297    // /////////////////////////////////////////////
298    //
299    // Implementation methods.
300    //
301    // /////////////////////////////////////////////
302
303    protected ActiveMQConnection createActiveMQConnection() throws JMSException {
304        return createActiveMQConnection(userName, password);
305    }
306
307    /**
308     * Creates a Transport based on this object's connection settings. Separated
309     * from createActiveMQConnection to allow for subclasses to override.
310     *
311     * @return The newly created Transport.
312     * @throws JMSException If unable to create trasnport.
313     */
314    protected Transport createTransport() throws JMSException {
315        try {
316            URI connectBrokerUL = brokerURL;
317            String scheme = brokerURL.getScheme();
318            if (scheme == null) {
319                throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
320            }
321            if (scheme.equals("auto")) {
322                connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
323            } else if (scheme.equals("auto+ssl")) {
324                connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
325            } else if (scheme.equals("auto+nio")) {
326                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
327            } else if (scheme.equals("auto+nio+ssl")) {
328                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
329            }
330
331            return TransportFactory.connect(connectBrokerUL);
332        } catch (Exception e) {
333            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
334        }
335    }
336
337    /**
338     * @return Returns the Connection.
339     */
340    protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
341        if (brokerURL == null) {
342            throw new ConfigurationException("brokerURL not set.");
343        }
344        ActiveMQConnection connection = null;
345        try {
346            Transport transport = createTransport();
347            connection = createActiveMQConnection(transport, factoryStats);
348
349            connection.setUserName(userName);
350            connection.setPassword(password);
351
352            configureConnection(connection);
353
354            transport.start();
355
356            if (clientID != null) {
357                connection.setDefaultClientID(clientID);
358            }
359
360            return connection;
361        } catch (JMSException e) {
362            // Clean up!
363            try {
364                connection.close();
365            } catch (Throwable ignore) {
366            }
367            throw e;
368        } catch (Exception e) {
369            // Clean up!
370            try {
371                connection.close();
372            } catch (Throwable ignore) {
373            }
374            throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
375        }
376    }
377
378    protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
379        ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
380                getConnectionIdGenerator(), stats);
381        return connection;
382    }
383
384    protected void configureConnection(ActiveMQConnection connection) throws JMSException {
385        connection.setPrefetchPolicy(getPrefetchPolicy());
386        connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
387        connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
388        connection.setCopyMessageOnSend(isCopyMessageOnSend());
389        connection.setUseCompression(isUseCompression());
390        connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
391        connection.setDispatchAsync(isDispatchAsync());
392        connection.setUseAsyncSend(isUseAsyncSend());
393        connection.setAlwaysSyncSend(isAlwaysSyncSend());
394        connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
395        connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
396        connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
397        connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval());
398        connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
399        connection.setExclusiveConsumer(isExclusiveConsumer());
400        connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
401        connection.setTransformer(getTransformer());
402        connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
403        connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
404        connection.setProducerWindowSize(getProducerWindowSize());
405        connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
406        connection.setSendTimeout(getSendTimeout());
407        connection.setCloseTimeout(getCloseTimeout());
408        connection.setSendAcksAsync(isSendAcksAsync());
409        connection.setAuditDepth(getAuditDepth());
410        connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
411        connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
412        connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
413        connection.setCheckForDuplicates(isCheckForDuplicates());
414        connection.setMessagePrioritySupported(isMessagePrioritySupported());
415        connection.setTransactedIndividualAck(isTransactedIndividualAck());
416        connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
417        connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
418        connection.setSessionTaskRunner(getSessionTaskRunner());
419        connection.setRejectedTaskHandler(getRejectedTaskHandler());
420        connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
421        connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
422        connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled());
423        connection.setTrustedPackages(getTrustedPackages());
424        connection.setTrustAllPackages(isTrustAllPackages());
425        connection.setConnectResponseTimeout(getConnectResponseTimeout());
426        if (transportListener != null) {
427            connection.addTransportListener(transportListener);
428        }
429        if (exceptionListener != null) {
430            connection.setExceptionListener(exceptionListener);
431        }
432        if (clientInternalExceptionListener != null) {
433            connection.setClientInternalExceptionListener(clientInternalExceptionListener);
434        }
435    }
436
437    // /////////////////////////////////////////////
438    //
439    // Property Accessors
440    //
441    // /////////////////////////////////////////////
442
443        public String getBrokerURL() {
444        return brokerURL == null ? null : brokerURL.toString();
445    }
446
447    /**
448     * Sets the <a
449     * href="http://activemq.apache.org/configuring-transports.html">connection
450     * URL</a> used to connect to the ActiveMQ broker.
451     */
452    public void setBrokerURL(String brokerURL) {
453        this.brokerURL = createURI(brokerURL);
454
455        // Use all the properties prefixed with 'jms.' to set the connection
456        // factory
457        // options.
458        if (this.brokerURL.getQuery() != null) {
459            // It might be a standard URI or...
460            try {
461
462                Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery());
463                Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms.");
464                if (buildFromMap(jmsOptionsMap)) {
465                    if (!jmsOptionsMap.isEmpty()) {
466                        String msg = "There are " + jmsOptionsMap.size()
467                            + " jms options that couldn't be set on the ConnectionFactory."
468                            + " Check the options are spelled correctly."
469                            + " Unknown parameters=[" + jmsOptionsMap + "]."
470                            + " This connection factory cannot be started.";
471                        throw new IllegalArgumentException(msg);
472                    }
473
474                    this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
475                }
476
477            } catch (URISyntaxException e) {
478            }
479
480        } else {
481
482            // It might be a composite URI.
483            try {
484                CompositeData data = URISupport.parseComposite(this.brokerURL);
485                Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms.");
486                if (buildFromMap(jmsOptionsMap)) {
487                    if (!jmsOptionsMap.isEmpty()) {
488                        String msg = "There are " + jmsOptionsMap.size()
489                            + " jms options that couldn't be set on the ConnectionFactory."
490                            + " Check the options are spelled correctly."
491                            + " Unknown parameters=[" + jmsOptionsMap + "]."
492                            + " This connection factory cannot be started.";
493                        throw new IllegalArgumentException(msg);
494                    }
495
496                    this.brokerURL = data.toURI();
497                }
498            } catch (URISyntaxException e) {
499            }
500        }
501    }
502
503    public String getClientID() {
504        return clientID;
505    }
506
507    /**
508     * Sets the JMS clientID to use for the created connection. Note that this
509     * can only be used by one connection at once so generally its a better idea
510     * to set the clientID on a Connection
511     */
512    public void setClientID(String clientID) {
513        this.clientID = clientID;
514    }
515
516    public boolean isCopyMessageOnSend() {
517        return copyMessageOnSend;
518    }
519
520    /**
521     * Should a JMS message be copied to a new JMS Message object as part of the
522     * send() method in JMS. This is enabled by default to be compliant with the
523     * JMS specification. You can disable it if you do not mutate JMS messages
524     * after they are sent for a performance boost
525     */
526    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
527        this.copyMessageOnSend = copyMessageOnSend;
528    }
529
530    public boolean isDisableTimeStampsByDefault() {
531        return disableTimeStampsByDefault;
532    }
533
534    /**
535     * Sets whether or not timestamps on messages should be disabled or not. If
536     * you disable them it adds a small performance boost.
537     */
538    public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
539        this.disableTimeStampsByDefault = disableTimeStampsByDefault;
540    }
541
542    public boolean isOptimizedMessageDispatch() {
543        return optimizedMessageDispatch;
544    }
545
546    /**
547     * If this flag is set then an larger prefetch limit is used - only
548     * applicable for durable topic subscribers.
549     */
550    public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
551        this.optimizedMessageDispatch = optimizedMessageDispatch;
552    }
553
554    public String getPassword() {
555        return password;
556    }
557
558    /**
559     * Sets the JMS password used for connections created from this factory
560     */
561    public void setPassword(String password) {
562        this.password = password;
563    }
564
565    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
566        return prefetchPolicy;
567    }
568
569    /**
570     * Sets the <a
571     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
572     * policy</a> for consumers created by this connection.
573     */
574    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
575        this.prefetchPolicy = prefetchPolicy;
576    }
577
578    public boolean isUseAsyncSend() {
579        return useAsyncSend;
580    }
581
582    public BlobTransferPolicy getBlobTransferPolicy() {
583        return blobTransferPolicy;
584    }
585
586    /**
587     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
588     * OBjects) are transferred from producers to brokers to consumers
589     */
590    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
591        this.blobTransferPolicy = blobTransferPolicy;
592    }
593
594    /**
595     * Forces the use of <a
596     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
597     * adds a massive performance boost; but means that the send() method will
598     * return immediately whether the message has been sent or not which could
599     * lead to message loss.
600     */
601    public void setUseAsyncSend(boolean useAsyncSend) {
602        this.useAsyncSend = useAsyncSend;
603    }
604
605    public synchronized boolean isWatchTopicAdvisories() {
606        return watchTopicAdvisories;
607    }
608
609    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
610        this.watchTopicAdvisories = watchTopicAdvisories;
611    }
612
613    /**
614     * @return true if always sync send messages
615     */
616    public boolean isAlwaysSyncSend() {
617        return this.alwaysSyncSend;
618    }
619
620    /**
621     * Set true if always require messages to be sync sent
622     *
623     * @param alwaysSyncSend
624     */
625    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
626        this.alwaysSyncSend = alwaysSyncSend;
627    }
628
629    public String getUserName() {
630        return userName;
631    }
632
633    /**
634     * Sets the JMS userName used by connections created by this factory
635     */
636    public void setUserName(String userName) {
637        this.userName = userName;
638    }
639
640    public boolean isUseRetroactiveConsumer() {
641        return useRetroactiveConsumer;
642    }
643
644    /**
645     * Sets whether or not retroactive consumers are enabled. Retroactive
646     * consumers allow non-durable topic subscribers to receive old messages
647     * that were published before the non-durable subscriber started.
648     */
649    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
650        this.useRetroactiveConsumer = useRetroactiveConsumer;
651    }
652
653    public boolean isExclusiveConsumer() {
654        return exclusiveConsumer;
655    }
656
657    /**
658     * Enables or disables whether or not queue consumers should be exclusive or
659     * not for example to preserve ordering when not using <a
660     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
661     *
662     * @param exclusiveConsumer
663     */
664    public void setExclusiveConsumer(boolean exclusiveConsumer) {
665        this.exclusiveConsumer = exclusiveConsumer;
666    }
667
668    public RedeliveryPolicy getRedeliveryPolicy() {
669        return redeliveryPolicyMap.getDefaultEntry();
670    }
671
672    /**
673     * Sets the global default redelivery policy to be used when a message is delivered
674     * but the session is rolled back
675     */
676    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
677        this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
678    }
679
680    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
681        return this.redeliveryPolicyMap;
682    }
683
684    /**
685     * Sets the global redelivery policy mapping to be used when a message is delivered
686     * but the session is rolled back
687     */
688    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
689        this.redeliveryPolicyMap = redeliveryPolicyMap;
690    }
691
692    public MessageTransformer getTransformer() {
693        return transformer;
694    }
695
696    /**
697     * @return the sendTimeout (in milliseconds)
698     */
699    public int getSendTimeout() {
700        return sendTimeout;
701    }
702
703    /**
704     * @param sendTimeout the sendTimeout to set (in milliseconds)
705     */
706    public void setSendTimeout(int sendTimeout) {
707        this.sendTimeout = sendTimeout;
708    }
709
710    /**
711     * @return the sendAcksAsync
712     */
713    public boolean isSendAcksAsync() {
714        return sendAcksAsync;
715    }
716
717    /**
718     * @param sendAcksAsync the sendAcksAsync to set
719     */
720    public void setSendAcksAsync(boolean sendAcksAsync) {
721        this.sendAcksAsync = sendAcksAsync;
722    }
723
724    /**
725     * @return the messagePrioritySupported
726     */
727    public boolean isMessagePrioritySupported() {
728        return this.messagePrioritySupported;
729    }
730
731    /**
732     * @param messagePrioritySupported the messagePrioritySupported to set
733     */
734    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
735        this.messagePrioritySupported = messagePrioritySupported;
736    }
737
738
739    /**
740     * Sets the transformer used to transform messages before they are sent on
741     * to the JMS bus or when they are received from the bus but before they are
742     * delivered to the JMS client
743     */
744    public void setTransformer(MessageTransformer transformer) {
745        this.transformer = transformer;
746    }
747
748    @SuppressWarnings({ "unchecked", "rawtypes" })
749    @Override
750    public void buildFromProperties(Properties properties) {
751
752        if (properties == null) {
753            properties = new Properties();
754        }
755
756        String temp = properties.getProperty(Context.PROVIDER_URL);
757        if (temp == null || temp.length() == 0) {
758            temp = properties.getProperty("brokerURL");
759        }
760        if (temp != null && temp.length() > 0) {
761            setBrokerURL(temp);
762        }
763
764        Map<String, Object> p = new HashMap(properties);
765        buildFromMap(p);
766    }
767
768    public boolean buildFromMap(Map<String, Object> properties) {
769        boolean rc = false;
770
771        ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
772        if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
773            setPrefetchPolicy(p);
774            rc = true;
775        }
776
777        RedeliveryPolicy rp = new RedeliveryPolicy();
778        if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
779            setRedeliveryPolicy(rp);
780            rc = true;
781        }
782
783        BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
784        if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
785            setBlobTransferPolicy(blobTransferPolicy);
786            rc = true;
787        }
788
789        rc |= IntrospectionSupport.setProperties(this, properties);
790
791        return rc;
792    }
793
794    @Override
795    public void populateProperties(Properties props) {
796        props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
797
798        if (getBrokerURL() != null) {
799            props.setProperty(Context.PROVIDER_URL, getBrokerURL());
800            props.setProperty("brokerURL", getBrokerURL());
801        }
802
803        if (getClientID() != null) {
804            props.setProperty("clientID", getClientID());
805        }
806
807        IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
808        IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
809        IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
810
811        props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
812        props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
813        props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
814        props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
815
816        if (getPassword() != null) {
817            props.setProperty("password", getPassword());
818        }
819
820        props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
821        props.setProperty("useCompression", Boolean.toString(isUseCompression()));
822        props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
823        props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
824
825        if (getUserName() != null) {
826            props.setProperty("userName", getUserName());
827        }
828
829        props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
830        props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
831        props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
832        props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
833        props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
834        props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
835        props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
836        props.setProperty("connectResponseTimeout", Integer.toString(getConnectResponseTimeout()));
837        props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
838        props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
839        props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
840        props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
841        props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
842        props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck()));
843        props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery()));
844        props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize()));
845        props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled()));
846        props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod()));
847        props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId()));
848        props.setProperty("consumerExpiryCheckEnabled", Boolean.toString(isConsumerExpiryCheckEnabled()));
849    }
850
851    public boolean isUseCompression() {
852        return useCompression;
853    }
854
855    /**
856     * Enables the use of compression of the message bodies
857     */
858    public void setUseCompression(boolean useCompression) {
859        this.useCompression = useCompression;
860    }
861
862    public boolean isObjectMessageSerializationDefered() {
863        return objectMessageSerializationDefered;
864    }
865
866    /**
867     * When an object is set on an ObjectMessage, the JMS spec requires the
868     * object to be serialized by that set method. Enabling this flag causes the
869     * object to not get serialized. The object may subsequently get serialized
870     * if the message needs to be sent over a socket or stored to disk.
871     */
872    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
873        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
874    }
875
876    public boolean isDispatchAsync() {
877        return dispatchAsync;
878    }
879
880    /**
881     * Enables or disables the default setting of whether or not consumers have
882     * their messages <a
883     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
884     * synchronously or asynchronously by the broker</a>. For non-durable
885     * topics for example we typically dispatch synchronously by default to
886     * minimize context switches which boost performance. However sometimes its
887     * better to go slower to ensure that a single blocked consumer socket does
888     * not block delivery to other consumers.
889     *
890     * @param asyncDispatch If true then consumers created on this connection
891     *                will default to having their messages dispatched
892     *                asynchronously. The default value is true.
893     */
894    public void setDispatchAsync(boolean asyncDispatch) {
895        this.dispatchAsync = asyncDispatch;
896    }
897
898    /**
899     * @return Returns the closeTimeout.
900     */
901    public int getCloseTimeout() {
902        return closeTimeout;
903    }
904
905    /**
906     * Sets the timeout before a close is considered complete. Normally a
907     * close() on a connection waits for confirmation from the broker; this
908     * allows that operation to timeout to save the client hanging if there is
909     * no broker
910     */
911    public void setCloseTimeout(int closeTimeout) {
912        this.closeTimeout = closeTimeout;
913    }
914
915    /**
916     * @return Returns the alwaysSessionAsync.
917     */
918    public boolean isAlwaysSessionAsync() {
919        return alwaysSessionAsync;
920    }
921
922    /**
923     * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
924     * the Connection. However, a separate thread is always used if there is more than one session, or the session
925     * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
926     * happens asynchronously.
927     */
928    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
929        this.alwaysSessionAsync = alwaysSessionAsync;
930    }
931
932    /**
933     * @return Returns the optimizeAcknowledge.
934     */
935    public boolean isOptimizeAcknowledge() {
936        return optimizeAcknowledge;
937    }
938
939    /**
940     * @param optimizeAcknowledge The optimizeAcknowledge to set.
941     */
942    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
943        this.optimizeAcknowledge = optimizeAcknowledge;
944    }
945
946    /**
947     * The max time in milliseconds between optimized ack batches
948     * @param optimizeAcknowledgeTimeOut
949     */
950    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
951        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
952    }
953
954    public long getOptimizeAcknowledgeTimeOut() {
955        return optimizeAcknowledgeTimeOut;
956    }
957
958    public boolean isNestedMapAndListEnabled() {
959        return nestedMapAndListEnabled;
960    }
961
962    /**
963     * Enables/disables whether or not Message properties and MapMessage entries
964     * support <a
965     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
966     * Structures</a> of Map and List objects
967     */
968    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
969        this.nestedMapAndListEnabled = structuredMapsEnabled;
970    }
971
972    public String getClientIDPrefix() {
973        return clientIDPrefix;
974    }
975
976    /**
977     * Sets the prefix used by autogenerated JMS Client ID values which are used
978     * if the JMS client does not explicitly specify on.
979     *
980     * @param clientIDPrefix
981     */
982    public void setClientIDPrefix(String clientIDPrefix) {
983        this.clientIDPrefix = clientIDPrefix;
984    }
985
986    protected synchronized IdGenerator getClientIdGenerator() {
987        if (clientIdGenerator == null) {
988            if (clientIDPrefix != null) {
989                clientIdGenerator = new IdGenerator(clientIDPrefix);
990            } else {
991                clientIdGenerator = new IdGenerator();
992            }
993        }
994        return clientIdGenerator;
995    }
996
997    protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
998        this.clientIdGenerator = clientIdGenerator;
999    }
1000
1001    /**
1002     * Sets the prefix used by connection id generator
1003     * @param connectionIDPrefix
1004     */
1005    public void setConnectionIDPrefix(String connectionIDPrefix) {
1006        this.connectionIDPrefix = connectionIDPrefix;
1007    }
1008
1009    protected synchronized IdGenerator getConnectionIdGenerator() {
1010        if (connectionIdGenerator == null) {
1011            if (connectionIDPrefix != null) {
1012                connectionIdGenerator = new IdGenerator(connectionIDPrefix);
1013            } else {
1014                connectionIdGenerator = new IdGenerator();
1015            }
1016        }
1017        return connectionIdGenerator;
1018    }
1019
1020    protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
1021        this.connectionIdGenerator = connectionIdGenerator;
1022    }
1023
1024    /**
1025     * @return the statsEnabled
1026     */
1027    public boolean isStatsEnabled() {
1028        return this.factoryStats.isEnabled();
1029    }
1030
1031    /**
1032     * @param statsEnabled the statsEnabled to set
1033     */
1034    public void setStatsEnabled(boolean statsEnabled) {
1035        this.factoryStats.setEnabled(statsEnabled);
1036    }
1037
1038    public synchronized int getProducerWindowSize() {
1039        return producerWindowSize;
1040    }
1041
1042    public synchronized void setProducerWindowSize(int producerWindowSize) {
1043        this.producerWindowSize = producerWindowSize;
1044    }
1045
1046    public long getWarnAboutUnstartedConnectionTimeout() {
1047        return warnAboutUnstartedConnectionTimeout;
1048    }
1049
1050    /**
1051     * Enables the timeout from a connection creation to when a warning is
1052     * generated if the connection is not properly started via
1053     * {@link Connection#start()} and a message is received by a consumer. It is
1054     * a very common gotcha to forget to <a
1055     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1056     * the connection</a> so this option makes the default case to create a
1057     * warning if the user forgets. To disable the warning just set the value to <
1058     * 0 (say -1).
1059     */
1060    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1061        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1062    }
1063
1064    public TransportListener getTransportListener() {
1065        return transportListener;
1066    }
1067
1068    /**
1069     * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
1070     * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
1071     * a transport listener.
1072     *
1073     * @param transportListener sets the listener to be registered on all connections
1074     * created by this factory
1075     */
1076    public void setTransportListener(TransportListener transportListener) {
1077        this.transportListener = transportListener;
1078    }
1079
1080
1081    public ExceptionListener getExceptionListener() {
1082        return exceptionListener;
1083    }
1084
1085    /**
1086     * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
1087     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1088     * an exception listener.
1089     * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
1090     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1091     * @param exceptionListener sets the exception listener to be registered on all connections
1092     * created by this factory
1093     */
1094    public void setExceptionListener(ExceptionListener exceptionListener) {
1095        this.exceptionListener = exceptionListener;
1096    }
1097
1098    public int getAuditDepth() {
1099        return auditDepth;
1100    }
1101
1102    public void setAuditDepth(int auditDepth) {
1103        this.auditDepth = auditDepth;
1104    }
1105
1106    public int getAuditMaximumProducerNumber() {
1107        return auditMaximumProducerNumber;
1108    }
1109
1110    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
1111        this.auditMaximumProducerNumber = auditMaximumProducerNumber;
1112    }
1113
1114    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1115        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1116    }
1117
1118    public boolean isUseDedicatedTaskRunner() {
1119        return useDedicatedTaskRunner;
1120    }
1121
1122    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
1123        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
1124    }
1125
1126    public long getConsumerFailoverRedeliveryWaitPeriod() {
1127        return consumerFailoverRedeliveryWaitPeriod;
1128    }
1129
1130    public ClientInternalExceptionListener getClientInternalExceptionListener() {
1131        return clientInternalExceptionListener;
1132    }
1133
1134    /**
1135     * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
1136     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1137     * an exception listener.
1138     * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
1139     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1140     * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
1141     * created by this factory
1142     */
1143    public void setClientInternalExceptionListener(
1144            ClientInternalExceptionListener clientInternalExceptionListener) {
1145        this.clientInternalExceptionListener = clientInternalExceptionListener;
1146    }
1147
1148    /**
1149     * @return the checkForDuplicates
1150     */
1151    public boolean isCheckForDuplicates() {
1152        return this.checkForDuplicates;
1153    }
1154
1155    /**
1156     * @param checkForDuplicates the checkForDuplicates to set
1157     */
1158    public void setCheckForDuplicates(boolean checkForDuplicates) {
1159        this.checkForDuplicates = checkForDuplicates;
1160    }
1161
1162    public boolean isTransactedIndividualAck() {
1163         return transactedIndividualAck;
1164     }
1165
1166     /**
1167      * when true, submit individual transacted acks immediately rather than with transaction completion.
1168      * This allows the acks to represent delivery status which can be persisted on rollback
1169      * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean)  true
1170      */
1171     public void setTransactedIndividualAck(boolean transactedIndividualAck) {
1172         this.transactedIndividualAck = transactedIndividualAck;
1173     }
1174
1175
1176     public boolean isNonBlockingRedelivery() {
1177         return nonBlockingRedelivery;
1178     }
1179
1180     /**
1181      * When true a MessageConsumer will not stop Message delivery before re-delivering Messages
1182      * from a rolled back transaction.  This implies that message order will not be preserved and
1183      * also will result in the TransactedIndividualAck option to be enabled.
1184      */
1185     public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
1186         this.nonBlockingRedelivery = nonBlockingRedelivery;
1187     }
1188
1189    public int getMaxThreadPoolSize() {
1190        return maxThreadPoolSize;
1191    }
1192
1193    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
1194        this.maxThreadPoolSize = maxThreadPoolSize;
1195    }
1196
1197    public TaskRunnerFactory getSessionTaskRunner() {
1198        return sessionTaskRunner;
1199    }
1200
1201    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1202        this.sessionTaskRunner = sessionTaskRunner;
1203    }
1204
1205    public RejectedExecutionHandler getRejectedTaskHandler() {
1206        return rejectedTaskHandler;
1207    }
1208
1209    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
1210        this.rejectedTaskHandler = rejectedTaskHandler;
1211    }
1212
1213    /**
1214     * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
1215     * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
1216     * will not do any background Message acknowledgment.
1217     *
1218     * @return the scheduledOptimizedAckInterval
1219     */
1220    public long getOptimizedAckScheduledAckInterval() {
1221        return optimizedAckScheduledAckInterval;
1222    }
1223
1224    /**
1225     * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
1226     * have been configured with optimizeAcknowledge enabled.
1227     *
1228     * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
1229     */
1230    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
1231        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
1232    }
1233
1234
1235    public boolean isRmIdFromConnectionId() {
1236        return rmIdFromConnectionId;
1237    }
1238
1239    /**
1240     * uses the connection id as the resource identity for XAResource.isSameRM
1241     * ensuring join will only occur on a single connection
1242     */
1243    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
1244        this.rmIdFromConnectionId = rmIdFromConnectionId;
1245    }
1246
1247    /**
1248     * @return true if MessageConsumer instance will check for expired messages before dispatch.
1249     */
1250    public boolean isConsumerExpiryCheckEnabled() {
1251        return consumerExpiryCheckEnabled;
1252    }
1253
1254    /**
1255     * Controls whether message expiration checking is done in each MessageConsumer
1256     * prior to dispatching a message.  Disabling this check can lead to consumption
1257     * of expired messages.
1258     *
1259     * @param consumerExpiryCheckEnabled
1260     *        controls whether expiration checking is done prior to dispatch.
1261     */
1262    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
1263        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
1264    }
1265
1266    public List<String> getTrustedPackages() {
1267        return trustedPackages;
1268    }
1269
1270    public void setTrustedPackages(List<String> trustedPackages) {
1271        this.trustedPackages = trustedPackages;
1272    }
1273
1274    public boolean isTrustAllPackages() {
1275        return trustAllPackages;
1276    }
1277
1278    public void setTrustAllPackages(boolean trustAllPackages) {
1279        this.trustAllPackages = trustAllPackages;
1280    }
1281
1282        public int getConnectResponseTimeout() {
1283                return connectResponseTimeout;
1284        }
1285
1286        public void setConnectResponseTimeout(int connectResponseTimeout) {
1287                this.connectResponseTimeout = connectResponseTimeout;
1288        }
1289}