001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.security.AccessController;
023import java.security.PrivilegedAction;
024import java.util.HashMap;
025import java.util.Map;
026import java.util.Properties;
027import java.util.concurrent.RejectedExecutionHandler;
028
029import javax.jms.Connection;
030import javax.jms.ConnectionFactory;
031import javax.jms.ExceptionListener;
032import javax.jms.JMSException;
033import javax.jms.QueueConnection;
034import javax.jms.QueueConnectionFactory;
035import javax.jms.TopicConnection;
036import javax.jms.TopicConnectionFactory;
037import javax.naming.Context;
038
039import org.apache.activemq.blob.BlobTransferPolicy;
040import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
041import org.apache.activemq.jndi.JNDIBaseStorable;
042import org.apache.activemq.management.JMSStatsImpl;
043import org.apache.activemq.management.StatsCapable;
044import org.apache.activemq.management.StatsImpl;
045import org.apache.activemq.thread.TaskRunnerFactory;
046import org.apache.activemq.transport.Transport;
047import org.apache.activemq.transport.TransportFactory;
048import org.apache.activemq.transport.TransportListener;
049import org.apache.activemq.util.IdGenerator;
050import org.apache.activemq.util.IntrospectionSupport;
051import org.apache.activemq.util.JMSExceptionSupport;
052import org.apache.activemq.util.URISupport;
053import org.apache.activemq.util.URISupport.CompositeData;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * A ConnectionFactory is an an Administered object, and is used for creating
059 * Connections. <p/> This class also implements QueueConnectionFactory and
060 * TopicConnectionFactory. You can use this connection to create both
061 * QueueConnections and TopicConnections.
062 *
063 *
064 * @see javax.jms.ConnectionFactory
065 */
066public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
067    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnectionFactory.class);
068    private static final String DEFAULT_BROKER_HOST;
069    private static final int DEFAULT_BROKER_PORT;
070    static{
071        String host = null;
072        String port = null;
073        try {
074             host = AccessController.doPrivileged(new PrivilegedAction<String>() {
075                 @Override
076                 public String run() {
077                     String result = System.getProperty("org.apache.activemq.AMQ_HOST");
078                     result = (result==null||result.isEmpty()) ?  System.getProperty("AMQ_HOST","localhost") : result;
079                     return result;
080                 }
081             });
082             port = AccessController.doPrivileged(new PrivilegedAction<String>() {
083                 @Override
084                 public String run() {
085                     String result = System.getProperty("org.apache.activemq.AMQ_PORT");
086                     result = (result==null||result.isEmpty()) ?  System.getProperty("AMQ_PORT","61616") : result;
087                     return result;
088                 }
089             });
090        }catch(Throwable e){
091            LOG.debug("Failed to look up System properties for host and port",e);
092        }
093        host = (host == null || host.isEmpty()) ? "localhost" : host;
094        port = (port == null || port.isEmpty()) ? "61616" : port;
095        DEFAULT_BROKER_HOST = host;
096        DEFAULT_BROKER_PORT = Integer.parseInt(port);
097    }
098
099
100    public static final String DEFAULT_BROKER_BIND_URL;
101
102    static{
103        final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
104        String bindURL = null;
105
106        try {
107            bindURL = AccessController.doPrivileged(new PrivilegedAction<String>() {
108                @Override
109                public String run() {
110                    String result = System.getProperty("org.apache.activemq.BROKER_BIND_URL");
111                    result = (result==null||result.isEmpty()) ?  System.getProperty("BROKER_BIND_URL",defaultURL) : result;
112                    return result;
113                }
114            });
115        }catch(Throwable e){
116            LOG.debug("Failed to look up System properties for host and port",e);
117        }
118        bindURL = (bindURL == null || bindURL.isEmpty()) ? defaultURL : bindURL;
119        DEFAULT_BROKER_BIND_URL = bindURL;
120    }
121
122    public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
123    public static final String DEFAULT_USER = null;
124    public static final String DEFAULT_PASSWORD = null;
125    public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
126
127    protected URI brokerURL;
128    protected String userName;
129    protected String password;
130    protected String clientID;
131    protected boolean dispatchAsync=true;
132    protected boolean alwaysSessionAsync=true;
133
134    JMSStatsImpl factoryStats = new JMSStatsImpl();
135
136    private IdGenerator clientIdGenerator;
137    private String clientIDPrefix;
138    private IdGenerator connectionIdGenerator;
139    private String connectionIDPrefix;
140
141    // client policies
142    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
143    private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
144    {
145        redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy());
146    }
147    private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
148    private MessageTransformer transformer;
149
150    private boolean disableTimeStampsByDefault;
151    private boolean optimizedMessageDispatch = true;
152    private long optimizeAcknowledgeTimeOut = 300;
153    private long optimizedAckScheduledAckInterval = 0;
154    private boolean copyMessageOnSend = true;
155    private boolean useCompression;
156    private boolean objectMessageSerializationDefered;
157    private boolean useAsyncSend;
158    private boolean optimizeAcknowledge;
159    private int closeTimeout = 15000;
160    private boolean useRetroactiveConsumer;
161    private boolean exclusiveConsumer;
162    private boolean nestedMapAndListEnabled = true;
163    private boolean alwaysSyncSend;
164    private boolean watchTopicAdvisories = true;
165    private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
166    private long warnAboutUnstartedConnectionTimeout = 500L;
167    private int sendTimeout = 0;
168    private boolean sendAcksAsync=true;
169    private TransportListener transportListener;
170    private ExceptionListener exceptionListener;
171    private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
172    private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
173    private boolean useDedicatedTaskRunner;
174    private long consumerFailoverRedeliveryWaitPeriod = 0;
175    private boolean checkForDuplicates = true;
176    private ClientInternalExceptionListener clientInternalExceptionListener;
177    private boolean messagePrioritySupported = false;
178    private boolean transactedIndividualAck = false;
179    private boolean nonBlockingRedelivery = false;
180    private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
181    private TaskRunnerFactory sessionTaskRunner;
182    private RejectedExecutionHandler rejectedTaskHandler = null;
183    protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class
184    private boolean rmIdFromConnectionId = false;
185    private boolean consumerExpiryCheckEnabled = true;
186
187    // /////////////////////////////////////////////
188    //
189    // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
190    //
191    // /////////////////////////////////////////////
192
193    public ActiveMQConnectionFactory() {
194        this(DEFAULT_BROKER_URL);
195    }
196
197    public ActiveMQConnectionFactory(String brokerURL) {
198        this(createURI(brokerURL));
199    }
200
201    public ActiveMQConnectionFactory(URI brokerURL) {
202        setBrokerURL(brokerURL.toString());
203    }
204
205    public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
206        setUserName(userName);
207        setPassword(password);
208        setBrokerURL(brokerURL.toString());
209    }
210
211    public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
212        setUserName(userName);
213        setPassword(password);
214        setBrokerURL(brokerURL);
215    }
216
217    /**
218     * Returns a copy of the given connection factory
219     */
220    public ActiveMQConnectionFactory copy() {
221        try {
222            return (ActiveMQConnectionFactory)super.clone();
223        } catch (CloneNotSupportedException e) {
224            throw new RuntimeException("This should never happen: " + e, e);
225        }
226    }
227
228    /*boolean*
229     * @param brokerURL
230     * @return
231     * @throws URISyntaxException
232     */
233    private static URI createURI(String brokerURL) {
234        try {
235            return new URI(brokerURL);
236        } catch (URISyntaxException e) {
237            throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
238        }
239    }
240
241    /**
242     * @return Returns the Connection.
243     */
244    @Override
245    public Connection createConnection() throws JMSException {
246        return createActiveMQConnection();
247    }
248
249    /**
250     * @return Returns the Connection.
251     */
252    @Override
253    public Connection createConnection(String userName, String password) throws JMSException {
254        return createActiveMQConnection(userName, password);
255    }
256
257    /**
258     * @return Returns the QueueConnection.
259     * @throws JMSException
260     */
261    @Override
262    public QueueConnection createQueueConnection() throws JMSException {
263        return createActiveMQConnection().enforceQueueOnlyConnection();
264    }
265
266    /**
267     * @return Returns the QueueConnection.
268     */
269    @Override
270    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
271        return createActiveMQConnection(userName, password).enforceQueueOnlyConnection();
272    }
273
274    /**
275     * @return Returns the TopicConnection.
276     * @throws JMSException
277     */
278    @Override
279    public TopicConnection createTopicConnection() throws JMSException {
280        return createActiveMQConnection();
281    }
282
283    /**
284     * @return Returns the TopicConnection.
285     */
286    @Override
287    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
288        return createActiveMQConnection(userName, password);
289    }
290
291    /**
292     * @return the StatsImpl associated with this ConnectionFactory.
293     */
294    @Override
295    public StatsImpl getStats() {
296        return this.factoryStats;
297    }
298
299    // /////////////////////////////////////////////
300    //
301    // Implementation methods.
302    //
303    // /////////////////////////////////////////////
304
305    protected ActiveMQConnection createActiveMQConnection() throws JMSException {
306        return createActiveMQConnection(userName, password);
307    }
308
309    /**
310     * Creates a Transport based on this object's connection settings. Separated
311     * from createActiveMQConnection to allow for subclasses to override.
312     *
313     * @return The newly created Transport.
314     * @throws JMSException If unable to create trasnport.
315     */
316    protected Transport createTransport() throws JMSException {
317        try {
318            URI connectBrokerUL = brokerURL;
319            String scheme = brokerURL.getScheme();
320            if (scheme == null) {
321                throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
322            }
323            if (scheme.equals("auto")) {
324                connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
325            } else if (scheme.equals("auto+ssl")) {
326                connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
327            } else if (scheme.equals("auto+nio")) {
328                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
329            } else if (scheme.equals("auto+nio+ssl")) {
330                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
331            }
332
333            return TransportFactory.connect(connectBrokerUL);
334        } catch (Exception e) {
335            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
336        }
337    }
338
339    /**
340     * @return Returns the Connection.
341     */
342    protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
343        if (brokerURL == null) {
344            throw new ConfigurationException("brokerURL not set.");
345        }
346        ActiveMQConnection connection = null;
347        try {
348            Transport transport = createTransport();
349            connection = createActiveMQConnection(transport, factoryStats);
350
351            connection.setUserName(userName);
352            connection.setPassword(password);
353
354            configureConnection(connection);
355
356            transport.start();
357
358            if (clientID != null) {
359                connection.setDefaultClientID(clientID);
360            }
361
362            return connection;
363        } catch (JMSException e) {
364            // Clean up!
365            try {
366                connection.close();
367            } catch (Throwable ignore) {
368            }
369            throw e;
370        } catch (Exception e) {
371            // Clean up!
372            try {
373                connection.close();
374            } catch (Throwable ignore) {
375            }
376            throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
377        }
378    }
379
380    protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
381        ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
382                getConnectionIdGenerator(), stats);
383        return connection;
384    }
385
386    protected void configureConnection(ActiveMQConnection connection) throws JMSException {
387        connection.setPrefetchPolicy(getPrefetchPolicy());
388        connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
389        connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
390        connection.setCopyMessageOnSend(isCopyMessageOnSend());
391        connection.setUseCompression(isUseCompression());
392        connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
393        connection.setDispatchAsync(isDispatchAsync());
394        connection.setUseAsyncSend(isUseAsyncSend());
395        connection.setAlwaysSyncSend(isAlwaysSyncSend());
396        connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
397        connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
398        connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
399        connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval());
400        connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
401        connection.setExclusiveConsumer(isExclusiveConsumer());
402        connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
403        connection.setTransformer(getTransformer());
404        connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
405        connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
406        connection.setProducerWindowSize(getProducerWindowSize());
407        connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
408        connection.setSendTimeout(getSendTimeout());
409        connection.setCloseTimeout(getCloseTimeout());
410        connection.setSendAcksAsync(isSendAcksAsync());
411        connection.setAuditDepth(getAuditDepth());
412        connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
413        connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
414        connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
415        connection.setCheckForDuplicates(isCheckForDuplicates());
416        connection.setMessagePrioritySupported(isMessagePrioritySupported());
417        connection.setTransactedIndividualAck(isTransactedIndividualAck());
418        connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
419        connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
420        connection.setSessionTaskRunner(getSessionTaskRunner());
421        connection.setRejectedTaskHandler(getRejectedTaskHandler());
422        connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
423        connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
424        connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled());
425        if (transportListener != null) {
426            connection.addTransportListener(transportListener);
427        }
428        if (exceptionListener != null) {
429            connection.setExceptionListener(exceptionListener);
430        }
431        if (clientInternalExceptionListener != null) {
432            connection.setClientInternalExceptionListener(clientInternalExceptionListener);
433        }
434    }
435
436    // /////////////////////////////////////////////
437    //
438    // Property Accessors
439    //
440    // /////////////////////////////////////////////
441
442    public String getBrokerURL() {
443        return brokerURL == null ? null : brokerURL.toString();
444    }
445
446    /**
447     * Sets the <a
448     * href="http://activemq.apache.org/configuring-transports.html">connection
449     * URL</a> used to connect to the ActiveMQ broker.
450     */
451    public void setBrokerURL(String brokerURL) {
452        this.brokerURL = createURI(brokerURL);
453
454        // Use all the properties prefixed with 'jms.' to set the connection
455        // factory
456        // options.
457        if (this.brokerURL.getQuery() != null) {
458            // It might be a standard URI or...
459            try {
460
461                Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery());
462                Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms.");
463                if (buildFromMap(jmsOptionsMap)) {
464                    if (!jmsOptionsMap.isEmpty()) {
465                        String msg = "There are " + jmsOptionsMap.size()
466                            + " jms options that couldn't be set on the ConnectionFactory."
467                            + " Check the options are spelled correctly."
468                            + " Unknown parameters=[" + jmsOptionsMap + "]."
469                            + " This connection factory cannot be started.";
470                        throw new IllegalArgumentException(msg);
471                    }
472
473                    this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
474                }
475
476            } catch (URISyntaxException e) {
477            }
478
479        } else {
480
481            // It might be a composite URI.
482            try {
483                CompositeData data = URISupport.parseComposite(this.brokerURL);
484                Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms.");
485                if (buildFromMap(jmsOptionsMap)) {
486                    if (!jmsOptionsMap.isEmpty()) {
487                        String msg = "There are " + jmsOptionsMap.size()
488                            + " jms options that couldn't be set on the ConnectionFactory."
489                            + " Check the options are spelled correctly."
490                            + " Unknown parameters=[" + jmsOptionsMap + "]."
491                            + " This connection factory cannot be started.";
492                        throw new IllegalArgumentException(msg);
493                    }
494
495                    this.brokerURL = data.toURI();
496                }
497            } catch (URISyntaxException e) {
498            }
499        }
500    }
501
502    public String getClientID() {
503        return clientID;
504    }
505
506    /**
507     * Sets the JMS clientID to use for the created connection. Note that this
508     * can only be used by one connection at once so generally its a better idea
509     * to set the clientID on a Connection
510     */
511    public void setClientID(String clientID) {
512        this.clientID = clientID;
513    }
514
515    public boolean isCopyMessageOnSend() {
516        return copyMessageOnSend;
517    }
518
519    /**
520     * Should a JMS message be copied to a new JMS Message object as part of the
521     * send() method in JMS. This is enabled by default to be compliant with the
522     * JMS specification. You can disable it if you do not mutate JMS messages
523     * after they are sent for a performance boost
524     */
525    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
526        this.copyMessageOnSend = copyMessageOnSend;
527    }
528
529    public boolean isDisableTimeStampsByDefault() {
530        return disableTimeStampsByDefault;
531    }
532
533    /**
534     * Sets whether or not timestamps on messages should be disabled or not. If
535     * you disable them it adds a small performance boost.
536     */
537    public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
538        this.disableTimeStampsByDefault = disableTimeStampsByDefault;
539    }
540
541    public boolean isOptimizedMessageDispatch() {
542        return optimizedMessageDispatch;
543    }
544
545    /**
546     * If this flag is set then an larger prefetch limit is used - only
547     * applicable for durable topic subscribers.
548     */
549    public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
550        this.optimizedMessageDispatch = optimizedMessageDispatch;
551    }
552
553    public String getPassword() {
554        return password;
555    }
556
557    /**
558     * Sets the JMS password used for connections created from this factory
559     */
560    public void setPassword(String password) {
561        this.password = password;
562    }
563
564    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
565        return prefetchPolicy;
566    }
567
568    /**
569     * Sets the <a
570     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
571     * policy</a> for consumers created by this connection.
572     */
573    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
574        this.prefetchPolicy = prefetchPolicy;
575    }
576
577    public boolean isUseAsyncSend() {
578        return useAsyncSend;
579    }
580
581    public BlobTransferPolicy getBlobTransferPolicy() {
582        return blobTransferPolicy;
583    }
584
585    /**
586     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
587     * OBjects) are transferred from producers to brokers to consumers
588     */
589    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
590        this.blobTransferPolicy = blobTransferPolicy;
591    }
592
593    /**
594     * Forces the use of <a
595     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
596     * adds a massive performance boost; but means that the send() method will
597     * return immediately whether the message has been sent or not which could
598     * lead to message loss.
599     */
600    public void setUseAsyncSend(boolean useAsyncSend) {
601        this.useAsyncSend = useAsyncSend;
602    }
603
604    public synchronized boolean isWatchTopicAdvisories() {
605        return watchTopicAdvisories;
606    }
607
608    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
609        this.watchTopicAdvisories = watchTopicAdvisories;
610    }
611
612    /**
613     * @return true if always sync send messages
614     */
615    public boolean isAlwaysSyncSend() {
616        return this.alwaysSyncSend;
617    }
618
619    /**
620     * Set true if always require messages to be sync sent
621     *
622     * @param alwaysSyncSend
623     */
624    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
625        this.alwaysSyncSend = alwaysSyncSend;
626    }
627
628    public String getUserName() {
629        return userName;
630    }
631
632    /**
633     * Sets the JMS userName used by connections created by this factory
634     */
635    public void setUserName(String userName) {
636        this.userName = userName;
637    }
638
639    public boolean isUseRetroactiveConsumer() {
640        return useRetroactiveConsumer;
641    }
642
643    /**
644     * Sets whether or not retroactive consumers are enabled. Retroactive
645     * consumers allow non-durable topic subscribers to receive old messages
646     * that were published before the non-durable subscriber started.
647     */
648    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
649        this.useRetroactiveConsumer = useRetroactiveConsumer;
650    }
651
652    public boolean isExclusiveConsumer() {
653        return exclusiveConsumer;
654    }
655
656    /**
657     * Enables or disables whether or not queue consumers should be exclusive or
658     * not for example to preserve ordering when not using <a
659     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
660     *
661     * @param exclusiveConsumer
662     */
663    public void setExclusiveConsumer(boolean exclusiveConsumer) {
664        this.exclusiveConsumer = exclusiveConsumer;
665    }
666
667    public RedeliveryPolicy getRedeliveryPolicy() {
668        return redeliveryPolicyMap.getDefaultEntry();
669    }
670
671    /**
672     * Sets the global default redelivery policy to be used when a message is delivered
673     * but the session is rolled back
674     */
675    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
676        this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
677    }
678
679    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
680        return this.redeliveryPolicyMap;
681    }
682
683    /**
684     * Sets the global redelivery policy mapping to be used when a message is delivered
685     * but the session is rolled back
686     */
687    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
688        this.redeliveryPolicyMap = redeliveryPolicyMap;
689    }
690
691    public MessageTransformer getTransformer() {
692        return transformer;
693    }
694
695    /**
696     * @return the sendTimeout (in milliseconds)
697     */
698    public int getSendTimeout() {
699        return sendTimeout;
700    }
701
702    /**
703     * @param sendTimeout the sendTimeout to set (in milliseconds)
704     */
705    public void setSendTimeout(int sendTimeout) {
706        this.sendTimeout = sendTimeout;
707    }
708
709    /**
710     * @return the sendAcksAsync
711     */
712    public boolean isSendAcksAsync() {
713        return sendAcksAsync;
714    }
715
716    /**
717     * @param sendAcksAsync the sendAcksAsync to set
718     */
719    public void setSendAcksAsync(boolean sendAcksAsync) {
720        this.sendAcksAsync = sendAcksAsync;
721    }
722
723    /**
724     * @return the messagePrioritySupported
725     */
726    public boolean isMessagePrioritySupported() {
727        return this.messagePrioritySupported;
728    }
729
730    /**
731     * @param messagePrioritySupported the messagePrioritySupported to set
732     */
733    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
734        this.messagePrioritySupported = messagePrioritySupported;
735    }
736
737
738    /**
739     * Sets the transformer used to transform messages before they are sent on
740     * to the JMS bus or when they are received from the bus but before they are
741     * delivered to the JMS client
742     */
743    public void setTransformer(MessageTransformer transformer) {
744        this.transformer = transformer;
745    }
746
747    @SuppressWarnings({ "unchecked", "rawtypes" })
748    @Override
749    public void buildFromProperties(Properties properties) {
750
751        if (properties == null) {
752            properties = new Properties();
753        }
754
755        String temp = properties.getProperty(Context.PROVIDER_URL);
756        if (temp == null || temp.length() == 0) {
757            temp = properties.getProperty("brokerURL");
758        }
759        if (temp != null && temp.length() > 0) {
760            setBrokerURL(temp);
761        }
762
763        Map<String, Object> p = new HashMap(properties);
764        buildFromMap(p);
765    }
766
767    public boolean buildFromMap(Map<String, Object> properties) {
768        boolean rc = false;
769
770        ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
771        if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
772            setPrefetchPolicy(p);
773            rc = true;
774        }
775
776        RedeliveryPolicy rp = new RedeliveryPolicy();
777        if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
778            setRedeliveryPolicy(rp);
779            rc = true;
780        }
781
782        BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
783        if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
784            setBlobTransferPolicy(blobTransferPolicy);
785            rc = true;
786        }
787
788        rc |= IntrospectionSupport.setProperties(this, properties);
789
790        return rc;
791    }
792
793    @Override
794    public void populateProperties(Properties props) {
795        props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
796
797        if (getBrokerURL() != null) {
798            props.setProperty(Context.PROVIDER_URL, getBrokerURL());
799            props.setProperty("brokerURL", getBrokerURL());
800        }
801
802        if (getClientID() != null) {
803            props.setProperty("clientID", getClientID());
804        }
805
806        IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
807        IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
808        IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
809
810        props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
811        props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
812        props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
813        props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
814
815        if (getPassword() != null) {
816            props.setProperty("password", getPassword());
817        }
818
819        props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
820        props.setProperty("useCompression", Boolean.toString(isUseCompression()));
821        props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
822        props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
823
824        if (getUserName() != null) {
825            props.setProperty("userName", getUserName());
826        }
827
828        props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
829        props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
830        props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
831        props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
832        props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
833        props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
834        props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
835        props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
836        props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
837        props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
838        props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
839        props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
840        props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck()));
841        props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery()));
842        props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize()));
843        props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled()));
844        props.setProperty("consumerFailoverRedeliveryWaitPeriod", Long.toString(getConsumerFailoverRedeliveryWaitPeriod()));
845        props.setProperty("rmIdFromConnectionId", Boolean.toString(isRmIdFromConnectionId()));
846        props.setProperty("consumerExpiryCheckEnabled", Boolean.toString(isConsumerExpiryCheckEnabled()));
847    }
848
849    public boolean isUseCompression() {
850        return useCompression;
851    }
852
853    /**
854     * Enables the use of compression of the message bodies
855     */
856    public void setUseCompression(boolean useCompression) {
857        this.useCompression = useCompression;
858    }
859
860    public boolean isObjectMessageSerializationDefered() {
861        return objectMessageSerializationDefered;
862    }
863
864    /**
865     * When an object is set on an ObjectMessage, the JMS spec requires the
866     * object to be serialized by that set method. Enabling this flag causes the
867     * object to not get serialized. The object may subsequently get serialized
868     * if the message needs to be sent over a socket or stored to disk.
869     */
870    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
871        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
872    }
873
874    public boolean isDispatchAsync() {
875        return dispatchAsync;
876    }
877
878    /**
879     * Enables or disables the default setting of whether or not consumers have
880     * their messages <a
881     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
882     * synchronously or asynchronously by the broker</a>. For non-durable
883     * topics for example we typically dispatch synchronously by default to
884     * minimize context switches which boost performance. However sometimes its
885     * better to go slower to ensure that a single blocked consumer socket does
886     * not block delivery to other consumers.
887     *
888     * @param asyncDispatch If true then consumers created on this connection
889     *                will default to having their messages dispatched
890     *                asynchronously. The default value is true.
891     */
892    public void setDispatchAsync(boolean asyncDispatch) {
893        this.dispatchAsync = asyncDispatch;
894    }
895
896    /**
897     * @return Returns the closeTimeout.
898     */
899    public int getCloseTimeout() {
900        return closeTimeout;
901    }
902
903    /**
904     * Sets the timeout before a close is considered complete. Normally a
905     * close() on a connection waits for confirmation from the broker; this
906     * allows that operation to timeout to save the client hanging if there is
907     * no broker
908     */
909    public void setCloseTimeout(int closeTimeout) {
910        this.closeTimeout = closeTimeout;
911    }
912
913    /**
914     * @return Returns the alwaysSessionAsync.
915     */
916    public boolean isAlwaysSessionAsync() {
917        return alwaysSessionAsync;
918    }
919
920    /**
921     * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
922     * the Connection. However, a separate thread is always used if there is more than one session, or the session
923     * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
924     * happens asynchronously.
925     */
926    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
927        this.alwaysSessionAsync = alwaysSessionAsync;
928    }
929
930    /**
931     * @return Returns the optimizeAcknowledge.
932     */
933    public boolean isOptimizeAcknowledge() {
934        return optimizeAcknowledge;
935    }
936
937    /**
938     * @param optimizeAcknowledge The optimizeAcknowledge to set.
939     */
940    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
941        this.optimizeAcknowledge = optimizeAcknowledge;
942    }
943
944    /**
945     * The max time in milliseconds between optimized ack batches
946     * @param optimizeAcknowledgeTimeOut
947     */
948    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
949        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
950    }
951
952    public long getOptimizeAcknowledgeTimeOut() {
953        return optimizeAcknowledgeTimeOut;
954    }
955
956    public boolean isNestedMapAndListEnabled() {
957        return nestedMapAndListEnabled;
958    }
959
960    /**
961     * Enables/disables whether or not Message properties and MapMessage entries
962     * support <a
963     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
964     * Structures</a> of Map and List objects
965     */
966    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
967        this.nestedMapAndListEnabled = structuredMapsEnabled;
968    }
969
970    public String getClientIDPrefix() {
971        return clientIDPrefix;
972    }
973
974    /**
975     * Sets the prefix used by autogenerated JMS Client ID values which are used
976     * if the JMS client does not explicitly specify on.
977     *
978     * @param clientIDPrefix
979     */
980    public void setClientIDPrefix(String clientIDPrefix) {
981        this.clientIDPrefix = clientIDPrefix;
982    }
983
984    protected synchronized IdGenerator getClientIdGenerator() {
985        if (clientIdGenerator == null) {
986            if (clientIDPrefix != null) {
987                clientIdGenerator = new IdGenerator(clientIDPrefix);
988            } else {
989                clientIdGenerator = new IdGenerator();
990            }
991        }
992        return clientIdGenerator;
993    }
994
995    protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
996        this.clientIdGenerator = clientIdGenerator;
997    }
998
999    /**
1000     * Sets the prefix used by connection id generator
1001     * @param connectionIDPrefix
1002     */
1003    public void setConnectionIDPrefix(String connectionIDPrefix) {
1004        this.connectionIDPrefix = connectionIDPrefix;
1005    }
1006
1007    protected synchronized IdGenerator getConnectionIdGenerator() {
1008        if (connectionIdGenerator == null) {
1009            if (connectionIDPrefix != null) {
1010                connectionIdGenerator = new IdGenerator(connectionIDPrefix);
1011            } else {
1012                connectionIdGenerator = new IdGenerator();
1013            }
1014        }
1015        return connectionIdGenerator;
1016    }
1017
1018    protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
1019        this.connectionIdGenerator = connectionIdGenerator;
1020    }
1021
1022    /**
1023     * @return the statsEnabled
1024     */
1025    public boolean isStatsEnabled() {
1026        return this.factoryStats.isEnabled();
1027    }
1028
1029    /**
1030     * @param statsEnabled the statsEnabled to set
1031     */
1032    public void setStatsEnabled(boolean statsEnabled) {
1033        this.factoryStats.setEnabled(statsEnabled);
1034    }
1035
1036    public synchronized int getProducerWindowSize() {
1037        return producerWindowSize;
1038    }
1039
1040    public synchronized void setProducerWindowSize(int producerWindowSize) {
1041        this.producerWindowSize = producerWindowSize;
1042    }
1043
1044    public long getWarnAboutUnstartedConnectionTimeout() {
1045        return warnAboutUnstartedConnectionTimeout;
1046    }
1047
1048    /**
1049     * Enables the timeout from a connection creation to when a warning is
1050     * generated if the connection is not properly started via
1051     * {@link Connection#start()} and a message is received by a consumer. It is
1052     * a very common gotcha to forget to <a
1053     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1054     * the connection</a> so this option makes the default case to create a
1055     * warning if the user forgets. To disable the warning just set the value to <
1056     * 0 (say -1).
1057     */
1058    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1059        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1060    }
1061
1062    public TransportListener getTransportListener() {
1063        return transportListener;
1064    }
1065
1066    /**
1067     * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
1068     * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
1069     * a transport listener.
1070     *
1071     * @param transportListener sets the listener to be registered on all connections
1072     * created by this factory
1073     */
1074    public void setTransportListener(TransportListener transportListener) {
1075        this.transportListener = transportListener;
1076    }
1077
1078
1079    public ExceptionListener getExceptionListener() {
1080        return exceptionListener;
1081    }
1082
1083    /**
1084     * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
1085     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1086     * an exception listener.
1087     * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
1088     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1089     * @param exceptionListener sets the exception listener to be registered on all connections
1090     * created by this factory
1091     */
1092    public void setExceptionListener(ExceptionListener exceptionListener) {
1093        this.exceptionListener = exceptionListener;
1094    }
1095
1096    public int getAuditDepth() {
1097        return auditDepth;
1098    }
1099
1100    public void setAuditDepth(int auditDepth) {
1101        this.auditDepth = auditDepth;
1102    }
1103
1104    public int getAuditMaximumProducerNumber() {
1105        return auditMaximumProducerNumber;
1106    }
1107
1108    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
1109        this.auditMaximumProducerNumber = auditMaximumProducerNumber;
1110    }
1111
1112    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1113        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1114    }
1115
1116    public boolean isUseDedicatedTaskRunner() {
1117        return useDedicatedTaskRunner;
1118    }
1119
1120    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
1121        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
1122    }
1123
1124    public long getConsumerFailoverRedeliveryWaitPeriod() {
1125        return consumerFailoverRedeliveryWaitPeriod;
1126    }
1127
1128    public ClientInternalExceptionListener getClientInternalExceptionListener() {
1129        return clientInternalExceptionListener;
1130    }
1131
1132    /**
1133     * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
1134     * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1135     * an exception listener.
1136     * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
1137     * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1138     * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
1139     * created by this factory
1140     */
1141    public void setClientInternalExceptionListener(
1142            ClientInternalExceptionListener clientInternalExceptionListener) {
1143        this.clientInternalExceptionListener = clientInternalExceptionListener;
1144    }
1145
1146    /**
1147     * @return the checkForDuplicates
1148     */
1149    public boolean isCheckForDuplicates() {
1150        return this.checkForDuplicates;
1151    }
1152
1153    /**
1154     * @param checkForDuplicates the checkForDuplicates to set
1155     */
1156    public void setCheckForDuplicates(boolean checkForDuplicates) {
1157        this.checkForDuplicates = checkForDuplicates;
1158    }
1159
1160    public boolean isTransactedIndividualAck() {
1161         return transactedIndividualAck;
1162     }
1163
1164     /**
1165      * when true, submit individual transacted acks immediately rather than with transaction completion.
1166      * This allows the acks to represent delivery status which can be persisted on rollback
1167      * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean)  true
1168      */
1169     public void setTransactedIndividualAck(boolean transactedIndividualAck) {
1170         this.transactedIndividualAck = transactedIndividualAck;
1171     }
1172
1173
1174     public boolean isNonBlockingRedelivery() {
1175         return nonBlockingRedelivery;
1176     }
1177
1178     /**
1179      * When true a MessageConsumer will not stop Message delivery before re-delivering Messages
1180      * from a rolled back transaction.  This implies that message order will not be preserved and
1181      * also will result in the TransactedIndividualAck option to be enabled.
1182      */
1183     public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
1184         this.nonBlockingRedelivery = nonBlockingRedelivery;
1185     }
1186
1187    public int getMaxThreadPoolSize() {
1188        return maxThreadPoolSize;
1189    }
1190
1191    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
1192        this.maxThreadPoolSize = maxThreadPoolSize;
1193    }
1194
1195    public TaskRunnerFactory getSessionTaskRunner() {
1196        return sessionTaskRunner;
1197    }
1198
1199    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1200        this.sessionTaskRunner = sessionTaskRunner;
1201    }
1202
1203    public RejectedExecutionHandler getRejectedTaskHandler() {
1204        return rejectedTaskHandler;
1205    }
1206
1207    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
1208        this.rejectedTaskHandler = rejectedTaskHandler;
1209    }
1210
1211    /**
1212     * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
1213     * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
1214     * will not do any background Message acknowledgment.
1215     *
1216     * @return the scheduledOptimizedAckInterval
1217     */
1218    public long getOptimizedAckScheduledAckInterval() {
1219        return optimizedAckScheduledAckInterval;
1220    }
1221
1222    /**
1223     * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
1224     * have been configured with optimizeAcknowledge enabled.
1225     *
1226     * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
1227     */
1228    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
1229        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
1230    }
1231
1232
1233    public boolean isRmIdFromConnectionId() {
1234        return rmIdFromConnectionId;
1235    }
1236
1237    /**
1238     * uses the connection id as the resource identity for XAResource.isSameRM
1239     * ensuring join will only occur on a single connection
1240     */
1241    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
1242        this.rmIdFromConnectionId = rmIdFromConnectionId;
1243    }
1244
1245    /**
1246     * @return true if MessageConsumer instance will check for expired messages before dispatch.
1247     */
1248    public boolean isConsumerExpiryCheckEnabled() {
1249        return consumerExpiryCheckEnabled;
1250    }
1251
1252    /**
1253     * Controls whether message expiration checking is done in each MessageConsumer
1254     * prior to dispatching a message.  Disabling this check can lead to consumption
1255     * of expired messages.
1256     *
1257     * @param consumerExpiryCheckEnabled
1258     *        controls whether expiration checking is done prior to dispatch.
1259     */
1260    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
1261        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
1262    }
1263}