001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq;
018    
019    import java.net.URI;
020    import java.net.URISyntaxException;
021    import java.util.HashMap;
022    import java.util.Map;
023    import java.util.Properties;
024    import java.util.concurrent.RejectedExecutionHandler;
025    
026    import javax.jms.Connection;
027    import javax.jms.ConnectionFactory;
028    import javax.jms.ExceptionListener;
029    import javax.jms.JMSException;
030    import javax.jms.QueueConnection;
031    import javax.jms.QueueConnectionFactory;
032    import javax.jms.TopicConnection;
033    import javax.jms.TopicConnectionFactory;
034    import javax.naming.Context;
035    
036    import org.apache.activemq.blob.BlobTransferPolicy;
037    import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
038    import org.apache.activemq.jndi.JNDIBaseStorable;
039    import org.apache.activemq.management.JMSStatsImpl;
040    import org.apache.activemq.management.StatsCapable;
041    import org.apache.activemq.management.StatsImpl;
042    import org.apache.activemq.thread.TaskRunnerFactory;
043    import org.apache.activemq.transport.Transport;
044    import org.apache.activemq.transport.TransportFactory;
045    import org.apache.activemq.transport.TransportListener;
046    import org.apache.activemq.util.IdGenerator;
047    import org.apache.activemq.util.IntrospectionSupport;
048    import org.apache.activemq.util.JMSExceptionSupport;
049    import org.apache.activemq.util.URISupport;
050    import org.apache.activemq.util.URISupport.CompositeData;
051    
052    /**
053     * A ConnectionFactory is an an Administered object, and is used for creating
054     * Connections. <p/> This class also implements QueueConnectionFactory and
055     * TopicConnectionFactory. You can use this connection to create both
056     * QueueConnections and TopicConnections.
057     *
058     *
059     * @see javax.jms.ConnectionFactory
060     */
061    public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
062    
063        public static final String DEFAULT_BROKER_BIND_URL = "tcp://localhost:61616";
064        public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
065        public static final String DEFAULT_USER = null;
066        public static final String DEFAULT_PASSWORD = null;
067        public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
068    
069        protected URI brokerURL;
070        protected String userName;
071        protected String password;
072        protected String clientID;
073        protected boolean dispatchAsync=true;
074        protected boolean alwaysSessionAsync=true;
075    
076        JMSStatsImpl factoryStats = new JMSStatsImpl();
077    
078        private IdGenerator clientIdGenerator;
079        private String clientIDPrefix;
080        private IdGenerator connectionIdGenerator;
081        private String connectionIDPrefix;
082    
083        // client policies
084        private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
085        private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
086        {
087            redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy());
088        }
089        private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
090        private MessageTransformer transformer;
091    
092        private boolean disableTimeStampsByDefault;
093        private boolean optimizedMessageDispatch = true;
094        private long optimizeAcknowledgeTimeOut = 300;
095        private long optimizedAckScheduledAckInterval = 0;
096        private boolean copyMessageOnSend = true;
097        private boolean useCompression;
098        private boolean objectMessageSerializationDefered;
099        private boolean useAsyncSend;
100        private boolean optimizeAcknowledge;
101        private int closeTimeout = 15000;
102        private boolean useRetroactiveConsumer;
103        private boolean exclusiveConsumer;
104        private boolean nestedMapAndListEnabled = true;
105        private boolean alwaysSyncSend;
106        private boolean watchTopicAdvisories = true;
107        private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
108        private long warnAboutUnstartedConnectionTimeout = 500L;
109        private int sendTimeout = 0;
110        private boolean sendAcksAsync=true;
111        private TransportListener transportListener;
112        private ExceptionListener exceptionListener;
113        private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
114        private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
115        private boolean useDedicatedTaskRunner;
116        private long consumerFailoverRedeliveryWaitPeriod = 0;
117        private boolean checkForDuplicates = true;
118        private ClientInternalExceptionListener clientInternalExceptionListener;
119        private boolean messagePrioritySupported = true;
120        private boolean transactedIndividualAck = false;
121        private boolean nonBlockingRedelivery = false;
122        private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
123        private TaskRunnerFactory sessionTaskRunner;
124        private RejectedExecutionHandler rejectedTaskHandler = null;
125    
126        // /////////////////////////////////////////////
127        //
128        // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
129        //
130        // /////////////////////////////////////////////
131    
132        public ActiveMQConnectionFactory() {
133            this(DEFAULT_BROKER_URL);
134        }
135    
136        public ActiveMQConnectionFactory(String brokerURL) {
137            this(createURI(brokerURL));
138        }
139    
140        public ActiveMQConnectionFactory(URI brokerURL) {
141            setBrokerURL(brokerURL.toString());
142        }
143    
144        public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
145            setUserName(userName);
146            setPassword(password);
147            setBrokerURL(brokerURL.toString());
148        }
149    
150        public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
151            setUserName(userName);
152            setPassword(password);
153            setBrokerURL(brokerURL);
154        }
155    
156        /**
157         * Returns a copy of the given connection factory
158         */
159        public ActiveMQConnectionFactory copy() {
160            try {
161                return (ActiveMQConnectionFactory)super.clone();
162            } catch (CloneNotSupportedException e) {
163                throw new RuntimeException("This should never happen: " + e, e);
164            }
165        }
166    
167        /**
168         * @param brokerURL
169         * @return
170         * @throws URISyntaxException
171         */
172        private static URI createURI(String brokerURL) {
173            try {
174                return new URI(brokerURL);
175            } catch (URISyntaxException e) {
176                throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
177            }
178        }
179    
180        /**
181         * @return Returns the Connection.
182         */
183        public Connection createConnection() throws JMSException {
184            return createActiveMQConnection();
185        }
186    
187        /**
188         * @return Returns the Connection.
189         */
190        public Connection createConnection(String userName, String password) throws JMSException {
191            return createActiveMQConnection(userName, password);
192        }
193    
194        /**
195         * @return Returns the QueueConnection.
196         * @throws JMSException
197         */
198        public QueueConnection createQueueConnection() throws JMSException {
199            return createActiveMQConnection().enforceQueueOnlyConnection();
200        }
201    
202        /**
203         * @return Returns the QueueConnection.
204         */
205        public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
206            return createActiveMQConnection(userName, password).enforceQueueOnlyConnection();
207        }
208    
209        /**
210         * @return Returns the TopicConnection.
211         * @throws JMSException
212         */
213        public TopicConnection createTopicConnection() throws JMSException {
214            return createActiveMQConnection();
215        }
216    
217        /**
218         * @return Returns the TopicConnection.
219         */
220        public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
221            return createActiveMQConnection(userName, password);
222        }
223    
224        /**
225         * @returns the StatsImpl associated with this ConnectionFactory.
226         */
227        public StatsImpl getStats() {
228            return this.factoryStats;
229        }
230    
231        // /////////////////////////////////////////////
232        //
233        // Implementation methods.
234        //
235        // /////////////////////////////////////////////
236    
237        protected ActiveMQConnection createActiveMQConnection() throws JMSException {
238            return createActiveMQConnection(userName, password);
239        }
240    
241        /**
242         * Creates a Transport based on this object's connection settings. Separated
243         * from createActiveMQConnection to allow for subclasses to override.
244         *
245         * @return The newly created Transport.
246         * @throws JMSException If unable to create trasnport.
247         */
248        protected Transport createTransport() throws JMSException {
249            try {
250                return TransportFactory.connect(brokerURL);
251            } catch (Exception e) {
252                throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
253            }
254        }
255    
256        /**
257         * @return Returns the Connection.
258         */
259        protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
260            if (brokerURL == null) {
261                throw new ConfigurationException("brokerURL not set.");
262            }
263            ActiveMQConnection connection = null;
264            try {
265                Transport transport = createTransport();
266                connection = createActiveMQConnection(transport, factoryStats);
267    
268                connection.setUserName(userName);
269                connection.setPassword(password);
270    
271                configureConnection(connection);
272    
273                transport.start();
274    
275                if (clientID != null) {
276                    connection.setDefaultClientID(clientID);
277                }
278    
279                return connection;
280            } catch (JMSException e) {
281                // Clean up!
282                try {
283                    connection.close();
284                } catch (Throwable ignore) {
285                }
286                throw e;
287            } catch (Exception e) {
288                // Clean up!
289                try {
290                    connection.close();
291                } catch (Throwable ignore) {
292                }
293                throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
294            }
295        }
296    
297        protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
298            ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
299                    getConnectionIdGenerator(), stats);
300            return connection;
301        }
302    
303        protected void configureConnection(ActiveMQConnection connection) throws JMSException {
304            connection.setPrefetchPolicy(getPrefetchPolicy());
305            connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
306            connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
307            connection.setCopyMessageOnSend(isCopyMessageOnSend());
308            connection.setUseCompression(isUseCompression());
309            connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
310            connection.setDispatchAsync(isDispatchAsync());
311            connection.setUseAsyncSend(isUseAsyncSend());
312            connection.setAlwaysSyncSend(isAlwaysSyncSend());
313            connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
314            connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
315            connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
316            connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval());
317            connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
318            connection.setExclusiveConsumer(isExclusiveConsumer());
319            connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
320            connection.setTransformer(getTransformer());
321            connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
322            connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
323            connection.setProducerWindowSize(getProducerWindowSize());
324            connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
325            connection.setSendTimeout(getSendTimeout());
326            connection.setCloseTimeout(getCloseTimeout());
327            connection.setSendAcksAsync(isSendAcksAsync());
328            connection.setAuditDepth(getAuditDepth());
329            connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
330            connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
331            connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
332            connection.setCheckForDuplicates(isCheckForDuplicates());
333            connection.setMessagePrioritySupported(isMessagePrioritySupported());
334            connection.setTransactedIndividualAck(isTransactedIndividualAck());
335            connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
336            connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
337            connection.setSessionTaskRunner(getSessionTaskRunner());
338            connection.setRejectedTaskHandler(getRejectedTaskHandler());
339            connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
340            if (transportListener != null) {
341                connection.addTransportListener(transportListener);
342            }
343            if (exceptionListener != null) {
344                connection.setExceptionListener(exceptionListener);
345            }
346            if (clientInternalExceptionListener != null) {
347                connection.setClientInternalExceptionListener(clientInternalExceptionListener);
348            }
349        }
350    
351        // /////////////////////////////////////////////
352        //
353        // Property Accessors
354        //
355        // /////////////////////////////////////////////
356    
357        public String getBrokerURL() {
358            return brokerURL == null ? null : brokerURL.toString();
359        }
360    
361        /**
362         * Sets the <a
363         * href="http://activemq.apache.org/configuring-transports.html">connection
364         * URL</a> used to connect to the ActiveMQ broker.
365         */
366        public void setBrokerURL(String brokerURL) {
367            this.brokerURL = createURI(brokerURL);
368    
369            // Use all the properties prefixed with 'jms.' to set the connection
370            // factory
371            // options.
372            if (this.brokerURL.getQuery() != null) {
373                // It might be a standard URI or...
374                try {
375    
376                    Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery());
377                    Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms.");
378                    if (buildFromMap(jmsOptionsMap)) {
379                        if (!jmsOptionsMap.isEmpty()) {
380                            String msg = "There are " + jmsOptionsMap.size()
381                                + " jms options that couldn't be set on the ConnectionFactory."
382                                + " Check the options are spelled correctly."
383                                + " Unknown parameters=[" + jmsOptionsMap + "]."
384                                + " This connection factory cannot be started.";
385                            throw new IllegalArgumentException(msg);
386                        }
387    
388                        this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
389                    }
390    
391                } catch (URISyntaxException e) {
392                }
393    
394            } else {
395    
396                // It might be a composite URI.
397                try {
398                    CompositeData data = URISupport.parseComposite(this.brokerURL);
399                    Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms.");
400                    if (buildFromMap(jmsOptionsMap)) {
401                        if (!jmsOptionsMap.isEmpty()) {
402                            String msg = "There are " + jmsOptionsMap.size()
403                                + " jms options that couldn't be set on the ConnectionFactory."
404                                + " Check the options are spelled correctly."
405                                + " Unknown parameters=[" + jmsOptionsMap + "]."
406                                + " This connection factory cannot be started.";
407                            throw new IllegalArgumentException(msg);
408                        }
409    
410                        this.brokerURL = data.toURI();
411                    }
412                } catch (URISyntaxException e) {
413                }
414            }
415        }
416    
417        public String getClientID() {
418            return clientID;
419        }
420    
421        /**
422         * Sets the JMS clientID to use for the created connection. Note that this
423         * can only be used by one connection at once so generally its a better idea
424         * to set the clientID on a Connection
425         */
426        public void setClientID(String clientID) {
427            this.clientID = clientID;
428        }
429    
430        public boolean isCopyMessageOnSend() {
431            return copyMessageOnSend;
432        }
433    
434        /**
435         * Should a JMS message be copied to a new JMS Message object as part of the
436         * send() method in JMS. This is enabled by default to be compliant with the
437         * JMS specification. You can disable it if you do not mutate JMS messages
438         * after they are sent for a performance boost
439         */
440        public void setCopyMessageOnSend(boolean copyMessageOnSend) {
441            this.copyMessageOnSend = copyMessageOnSend;
442        }
443    
444        public boolean isDisableTimeStampsByDefault() {
445            return disableTimeStampsByDefault;
446        }
447    
448        /**
449         * Sets whether or not timestamps on messages should be disabled or not. If
450         * you disable them it adds a small performance boost.
451         */
452        public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
453            this.disableTimeStampsByDefault = disableTimeStampsByDefault;
454        }
455    
456        public boolean isOptimizedMessageDispatch() {
457            return optimizedMessageDispatch;
458        }
459    
460        /**
461         * If this flag is set then an larger prefetch limit is used - only
462         * applicable for durable topic subscribers.
463         */
464        public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
465            this.optimizedMessageDispatch = optimizedMessageDispatch;
466        }
467    
468        public String getPassword() {
469            return password;
470        }
471    
472        /**
473         * Sets the JMS password used for connections created from this factory
474         */
475        public void setPassword(String password) {
476            this.password = password;
477        }
478    
479        public ActiveMQPrefetchPolicy getPrefetchPolicy() {
480            return prefetchPolicy;
481        }
482    
483        /**
484         * Sets the <a
485         * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
486         * policy</a> for consumers created by this connection.
487         */
488        public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
489            this.prefetchPolicy = prefetchPolicy;
490        }
491    
492        public boolean isUseAsyncSend() {
493            return useAsyncSend;
494        }
495    
496        public BlobTransferPolicy getBlobTransferPolicy() {
497            return blobTransferPolicy;
498        }
499    
500        /**
501         * Sets the policy used to describe how out-of-band BLOBs (Binary Large
502         * OBjects) are transferred from producers to brokers to consumers
503         */
504        public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
505            this.blobTransferPolicy = blobTransferPolicy;
506        }
507    
508        /**
509         * Forces the use of <a
510         * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
511         * adds a massive performance boost; but means that the send() method will
512         * return immediately whether the message has been sent or not which could
513         * lead to message loss.
514         */
515        public void setUseAsyncSend(boolean useAsyncSend) {
516            this.useAsyncSend = useAsyncSend;
517        }
518    
519        public synchronized boolean isWatchTopicAdvisories() {
520            return watchTopicAdvisories;
521        }
522    
523        public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
524            this.watchTopicAdvisories = watchTopicAdvisories;
525        }
526    
527        /**
528         * @return true if always sync send messages
529         */
530        public boolean isAlwaysSyncSend() {
531            return this.alwaysSyncSend;
532        }
533    
534        /**
535         * Set true if always require messages to be sync sent
536         *
537         * @param alwaysSyncSend
538         */
539        public void setAlwaysSyncSend(boolean alwaysSyncSend) {
540            this.alwaysSyncSend = alwaysSyncSend;
541        }
542    
543        public String getUserName() {
544            return userName;
545        }
546    
547        /**
548         * Sets the JMS userName used by connections created by this factory
549         */
550        public void setUserName(String userName) {
551            this.userName = userName;
552        }
553    
554        public boolean isUseRetroactiveConsumer() {
555            return useRetroactiveConsumer;
556        }
557    
558        /**
559         * Sets whether or not retroactive consumers are enabled. Retroactive
560         * consumers allow non-durable topic subscribers to receive old messages
561         * that were published before the non-durable subscriber started.
562         */
563        public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
564            this.useRetroactiveConsumer = useRetroactiveConsumer;
565        }
566    
567        public boolean isExclusiveConsumer() {
568            return exclusiveConsumer;
569        }
570    
571        /**
572         * Enables or disables whether or not queue consumers should be exclusive or
573         * not for example to preserve ordering when not using <a
574         * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
575         *
576         * @param exclusiveConsumer
577         */
578        public void setExclusiveConsumer(boolean exclusiveConsumer) {
579            this.exclusiveConsumer = exclusiveConsumer;
580        }
581    
582        public RedeliveryPolicy getRedeliveryPolicy() {
583            return redeliveryPolicyMap.getDefaultEntry();
584        }
585    
586        /**
587         * Sets the global default redelivery policy to be used when a message is delivered
588         * but the session is rolled back
589         */
590        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
591            this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
592        }
593    
594        public RedeliveryPolicyMap getRedeliveryPolicyMap() {
595            return this.redeliveryPolicyMap;
596        }
597    
598        /**
599         * Sets the global redelivery policy mapping to be used when a message is delivered
600         * but the session is rolled back
601         */
602        public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
603            this.redeliveryPolicyMap = redeliveryPolicyMap;
604        }
605    
606        public MessageTransformer getTransformer() {
607            return transformer;
608        }
609    
610        /**
611         * @return the sendTimeout
612         */
613        public int getSendTimeout() {
614            return sendTimeout;
615        }
616    
617        /**
618         * @param sendTimeout the sendTimeout to set
619         */
620        public void setSendTimeout(int sendTimeout) {
621            this.sendTimeout = sendTimeout;
622        }
623    
624        /**
625         * @return the sendAcksAsync
626         */
627        public boolean isSendAcksAsync() {
628            return sendAcksAsync;
629        }
630    
631        /**
632         * @param sendAcksAsync the sendAcksAsync to set
633         */
634        public void setSendAcksAsync(boolean sendAcksAsync) {
635            this.sendAcksAsync = sendAcksAsync;
636        }
637    
638        /**
639         * @return the messagePrioritySupported
640         */
641        public boolean isMessagePrioritySupported() {
642            return this.messagePrioritySupported;
643        }
644    
645        /**
646         * @param messagePrioritySupported the messagePrioritySupported to set
647         */
648        public void setMessagePrioritySupported(boolean messagePrioritySupported) {
649            this.messagePrioritySupported = messagePrioritySupported;
650        }
651    
652    
653        /**
654         * Sets the transformer used to transform messages before they are sent on
655         * to the JMS bus or when they are received from the bus but before they are
656         * delivered to the JMS client
657         */
658        public void setTransformer(MessageTransformer transformer) {
659            this.transformer = transformer;
660        }
661    
662        @SuppressWarnings({ "unchecked", "rawtypes" })
663        @Override
664        public void buildFromProperties(Properties properties) {
665    
666            if (properties == null) {
667                properties = new Properties();
668            }
669    
670            String temp = properties.getProperty(Context.PROVIDER_URL);
671            if (temp == null || temp.length() == 0) {
672                temp = properties.getProperty("brokerURL");
673            }
674            if (temp != null && temp.length() > 0) {
675                setBrokerURL(temp);
676            }
677    
678            Map<String, Object> p = new HashMap(properties);
679            buildFromMap(p);
680        }
681    
682        public boolean buildFromMap(Map<String, Object> properties) {
683            boolean rc = false;
684    
685            ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
686            if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
687                setPrefetchPolicy(p);
688                rc = true;
689            }
690    
691            RedeliveryPolicy rp = new RedeliveryPolicy();
692            if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
693                setRedeliveryPolicy(rp);
694                rc = true;
695            }
696    
697            BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
698            if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
699                setBlobTransferPolicy(blobTransferPolicy);
700                rc = true;
701            }
702    
703            rc |= IntrospectionSupport.setProperties(this, properties);
704    
705            return rc;
706        }
707    
708        @Override
709        public void populateProperties(Properties props) {
710            props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
711    
712            if (getBrokerURL() != null) {
713                props.setProperty(Context.PROVIDER_URL, getBrokerURL());
714                props.setProperty("brokerURL", getBrokerURL());
715            }
716    
717            if (getClientID() != null) {
718                props.setProperty("clientID", getClientID());
719            }
720    
721            IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
722            IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
723            IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
724    
725            props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
726            props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
727            props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
728            props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
729    
730            if (getPassword() != null) {
731                props.setProperty("password", getPassword());
732            }
733    
734            props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
735            props.setProperty("useCompression", Boolean.toString(isUseCompression()));
736            props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
737            props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
738    
739            if (getUserName() != null) {
740                props.setProperty("userName", getUserName());
741            }
742    
743            props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
744            props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
745            props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
746            props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
747            props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
748            props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
749            props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
750            props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
751            props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
752            props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
753            props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
754            props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
755            props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck()));
756            props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery()));
757            props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize()));
758            props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled()));
759        }
760    
761        public boolean isUseCompression() {
762            return useCompression;
763        }
764    
765        /**
766         * Enables the use of compression of the message bodies
767         */
768        public void setUseCompression(boolean useCompression) {
769            this.useCompression = useCompression;
770        }
771    
772        public boolean isObjectMessageSerializationDefered() {
773            return objectMessageSerializationDefered;
774        }
775    
776        /**
777         * When an object is set on an ObjectMessage, the JMS spec requires the
778         * object to be serialized by that set method. Enabling this flag causes the
779         * object to not get serialized. The object may subsequently get serialized
780         * if the message needs to be sent over a socket or stored to disk.
781         */
782        public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
783            this.objectMessageSerializationDefered = objectMessageSerializationDefered;
784        }
785    
786        public boolean isDispatchAsync() {
787            return dispatchAsync;
788        }
789    
790        /**
791         * Enables or disables the default setting of whether or not consumers have
792         * their messages <a
793         * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
794         * synchronously or asynchronously by the broker</a>. For non-durable
795         * topics for example we typically dispatch synchronously by default to
796         * minimize context switches which boost performance. However sometimes its
797         * better to go slower to ensure that a single blocked consumer socket does
798         * not block delivery to other consumers.
799         *
800         * @param asyncDispatch If true then consumers created on this connection
801         *                will default to having their messages dispatched
802         *                asynchronously. The default value is true.
803         */
804        public void setDispatchAsync(boolean asyncDispatch) {
805            this.dispatchAsync = asyncDispatch;
806        }
807    
808        /**
809         * @return Returns the closeTimeout.
810         */
811        public int getCloseTimeout() {
812            return closeTimeout;
813        }
814    
815        /**
816         * Sets the timeout before a close is considered complete. Normally a
817         * close() on a connection waits for confirmation from the broker; this
818         * allows that operation to timeout to save the client hanging if there is
819         * no broker
820         */
821        public void setCloseTimeout(int closeTimeout) {
822            this.closeTimeout = closeTimeout;
823        }
824    
825        /**
826         * @return Returns the alwaysSessionAsync.
827         */
828        public boolean isAlwaysSessionAsync() {
829            return alwaysSessionAsync;
830        }
831    
832        /**
833         * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
834         * the Connection. However, a separate thread is always used if there is more than one session, or the session
835         * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
836         * happens asynchronously.
837         */
838        public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
839            this.alwaysSessionAsync = alwaysSessionAsync;
840        }
841    
842        /**
843         * @return Returns the optimizeAcknowledge.
844         */
845        public boolean isOptimizeAcknowledge() {
846            return optimizeAcknowledge;
847        }
848    
849        /**
850         * @param optimizeAcknowledge The optimizeAcknowledge to set.
851         */
852        public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
853            this.optimizeAcknowledge = optimizeAcknowledge;
854        }
855    
856        /**
857         * The max time in milliseconds between optimized ack batches
858         * @param optimizeAcknowledgeTimeOut
859         */
860        public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
861            this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
862        }
863    
864        public long getOptimizeAcknowledgeTimeOut() {
865            return optimizeAcknowledgeTimeOut;
866        }
867    
868        public boolean isNestedMapAndListEnabled() {
869            return nestedMapAndListEnabled;
870        }
871    
872        /**
873         * Enables/disables whether or not Message properties and MapMessage entries
874         * support <a
875         * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
876         * Structures</a> of Map and List objects
877         */
878        public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
879            this.nestedMapAndListEnabled = structuredMapsEnabled;
880        }
881    
882        public String getClientIDPrefix() {
883            return clientIDPrefix;
884        }
885    
886        /**
887         * Sets the prefix used by autogenerated JMS Client ID values which are used
888         * if the JMS client does not explicitly specify on.
889         *
890         * @param clientIDPrefix
891         */
892        public void setClientIDPrefix(String clientIDPrefix) {
893            this.clientIDPrefix = clientIDPrefix;
894        }
895    
896        protected synchronized IdGenerator getClientIdGenerator() {
897            if (clientIdGenerator == null) {
898                if (clientIDPrefix != null) {
899                    clientIdGenerator = new IdGenerator(clientIDPrefix);
900                } else {
901                    clientIdGenerator = new IdGenerator();
902                }
903            }
904            return clientIdGenerator;
905        }
906    
907        protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
908            this.clientIdGenerator = clientIdGenerator;
909        }
910    
911        /**
912         * Sets the prefix used by connection id generator
913         * @param connectionIDPrefix
914         */
915        public void setConnectionIDPrefix(String connectionIDPrefix) {
916            this.connectionIDPrefix = connectionIDPrefix;
917        }
918    
919        protected synchronized IdGenerator getConnectionIdGenerator() {
920            if (connectionIdGenerator == null) {
921                if (connectionIDPrefix != null) {
922                    connectionIdGenerator = new IdGenerator(connectionIDPrefix);
923                } else {
924                    connectionIdGenerator = new IdGenerator();
925                }
926            }
927            return connectionIdGenerator;
928        }
929    
930        protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
931            this.connectionIdGenerator = connectionIdGenerator;
932        }
933    
934        /**
935         * @return the statsEnabled
936         */
937        public boolean isStatsEnabled() {
938            return this.factoryStats.isEnabled();
939        }
940    
941        /**
942         * @param statsEnabled the statsEnabled to set
943         */
944        public void setStatsEnabled(boolean statsEnabled) {
945            this.factoryStats.setEnabled(statsEnabled);
946        }
947    
948        public synchronized int getProducerWindowSize() {
949            return producerWindowSize;
950        }
951    
952        public synchronized void setProducerWindowSize(int producerWindowSize) {
953            this.producerWindowSize = producerWindowSize;
954        }
955    
956        public long getWarnAboutUnstartedConnectionTimeout() {
957            return warnAboutUnstartedConnectionTimeout;
958        }
959    
960        /**
961         * Enables the timeout from a connection creation to when a warning is
962         * generated if the connection is not properly started via
963         * {@link Connection#start()} and a message is received by a consumer. It is
964         * a very common gotcha to forget to <a
965         * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
966         * the connection</a> so this option makes the default case to create a
967         * warning if the user forgets. To disable the warning just set the value to <
968         * 0 (say -1).
969         */
970        public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
971            this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
972        }
973    
974        public TransportListener getTransportListener() {
975            return transportListener;
976        }
977    
978        /**
979         * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
980         * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
981         * a transport listener.
982         *
983         * @param transportListener sets the listener to be registered on all connections
984         * created by this factory
985         */
986        public void setTransportListener(TransportListener transportListener) {
987            this.transportListener = transportListener;
988        }
989    
990    
991        public ExceptionListener getExceptionListener() {
992            return exceptionListener;
993        }
994    
995        /**
996         * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
997         * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
998         * an exception listener.
999         * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
1000         * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1001         * @param exceptionListener sets the exception listener to be registered on all connections
1002         * created by this factory
1003         */
1004        public void setExceptionListener(ExceptionListener exceptionListener) {
1005            this.exceptionListener = exceptionListener;
1006        }
1007    
1008        public int getAuditDepth() {
1009            return auditDepth;
1010        }
1011    
1012        public void setAuditDepth(int auditDepth) {
1013            this.auditDepth = auditDepth;
1014        }
1015    
1016        public int getAuditMaximumProducerNumber() {
1017            return auditMaximumProducerNumber;
1018        }
1019    
1020        public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
1021            this.auditMaximumProducerNumber = auditMaximumProducerNumber;
1022        }
1023    
1024        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1025            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1026        }
1027    
1028        public boolean isUseDedicatedTaskRunner() {
1029            return useDedicatedTaskRunner;
1030        }
1031    
1032        public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
1033            this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
1034        }
1035    
1036        public long getConsumerFailoverRedeliveryWaitPeriod() {
1037            return consumerFailoverRedeliveryWaitPeriod;
1038        }
1039    
1040        public ClientInternalExceptionListener getClientInternalExceptionListener() {
1041            return clientInternalExceptionListener;
1042        }
1043    
1044        /**
1045         * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
1046         * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1047         * an exception listener.
1048         * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
1049         * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1050         * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
1051         * created by this factory
1052         */
1053        public void setClientInternalExceptionListener(
1054                ClientInternalExceptionListener clientInternalExceptionListener) {
1055            this.clientInternalExceptionListener = clientInternalExceptionListener;
1056        }
1057    
1058        /**
1059         * @return the checkForDuplicates
1060         */
1061        public boolean isCheckForDuplicates() {
1062            return this.checkForDuplicates;
1063        }
1064    
1065        /**
1066         * @param checkForDuplicates the checkForDuplicates to set
1067         */
1068        public void setCheckForDuplicates(boolean checkForDuplicates) {
1069            this.checkForDuplicates = checkForDuplicates;
1070        }
1071    
1072        public boolean isTransactedIndividualAck() {
1073             return transactedIndividualAck;
1074         }
1075    
1076         /**
1077          * when true, submit individual transacted acks immediately rather than with transaction completion.
1078          * This allows the acks to represent delivery status which can be persisted on rollback
1079          * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean)  true
1080          */
1081         public void setTransactedIndividualAck(boolean transactedIndividualAck) {
1082             this.transactedIndividualAck = transactedIndividualAck;
1083         }
1084    
1085    
1086         public boolean isNonBlockingRedelivery() {
1087             return nonBlockingRedelivery;
1088         }
1089    
1090         /**
1091          * When true a MessageConsumer will not stop Message delivery before re-delivering Messages
1092          * from a rolled back transaction.  This implies that message order will not be preserved and
1093          * also will result in the TransactedIndividualAck option to be enabled.
1094          */
1095         public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
1096             this.nonBlockingRedelivery = nonBlockingRedelivery;
1097         }
1098    
1099        public int getMaxThreadPoolSize() {
1100            return maxThreadPoolSize;
1101        }
1102    
1103        public void setMaxThreadPoolSize(int maxThreadPoolSize) {
1104            this.maxThreadPoolSize = maxThreadPoolSize;
1105        }
1106    
1107        public TaskRunnerFactory getSessionTaskRunner() {
1108            return sessionTaskRunner;
1109        }
1110    
1111        public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1112            this.sessionTaskRunner = sessionTaskRunner;
1113        }
1114    
1115        public RejectedExecutionHandler getRejectedTaskHandler() {
1116            return rejectedTaskHandler;
1117        }
1118    
1119        public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
1120            this.rejectedTaskHandler = rejectedTaskHandler;
1121        }
1122    
1123        /**
1124         * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
1125         * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
1126         * will not do any background Message acknowledgment.
1127         *
1128         * @return the scheduledOptimizedAckInterval
1129         */
1130        public long getOptimizedAckScheduledAckInterval() {
1131            return optimizedAckScheduledAckInterval;
1132        }
1133    
1134        /**
1135         * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
1136         * have been configured with optimizeAcknowledge enabled.
1137         *
1138         * @param scheduledOptimizedAckInterval the scheduledOptimizedAckInterval to set
1139         */
1140        public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
1141            this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
1142        }
1143    }