001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.net.URI;
020 import java.net.URISyntaxException;
021 import java.util.HashMap;
022 import java.util.Map;
023 import java.util.Properties;
024 import java.util.concurrent.RejectedExecutionHandler;
025
026 import javax.jms.Connection;
027 import javax.jms.ConnectionFactory;
028 import javax.jms.ExceptionListener;
029 import javax.jms.JMSException;
030 import javax.jms.QueueConnection;
031 import javax.jms.QueueConnectionFactory;
032 import javax.jms.TopicConnection;
033 import javax.jms.TopicConnectionFactory;
034 import javax.naming.Context;
035
036 import org.apache.activemq.blob.BlobTransferPolicy;
037 import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
038 import org.apache.activemq.jndi.JNDIBaseStorable;
039 import org.apache.activemq.management.JMSStatsImpl;
040 import org.apache.activemq.management.StatsCapable;
041 import org.apache.activemq.management.StatsImpl;
042 import org.apache.activemq.thread.TaskRunnerFactory;
043 import org.apache.activemq.transport.Transport;
044 import org.apache.activemq.transport.TransportFactory;
045 import org.apache.activemq.transport.TransportListener;
046 import org.apache.activemq.util.IdGenerator;
047 import org.apache.activemq.util.IntrospectionSupport;
048 import org.apache.activemq.util.JMSExceptionSupport;
049 import org.apache.activemq.util.URISupport;
050 import org.apache.activemq.util.URISupport.CompositeData;
051
052 /**
053 * A ConnectionFactory is an an Administered object, and is used for creating
054 * Connections. <p/> This class also implements QueueConnectionFactory and
055 * TopicConnectionFactory. You can use this connection to create both
056 * QueueConnections and TopicConnections.
057 *
058 *
059 * @see javax.jms.ConnectionFactory
060 */
061 public class ActiveMQConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
062
063 public static final String DEFAULT_BROKER_BIND_URL = "tcp://localhost:61616";
064 public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
065 public static final String DEFAULT_USER = null;
066 public static final String DEFAULT_PASSWORD = null;
067 public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
068
069 protected URI brokerURL;
070 protected String userName;
071 protected String password;
072 protected String clientID;
073 protected boolean dispatchAsync=true;
074 protected boolean alwaysSessionAsync=true;
075
076 JMSStatsImpl factoryStats = new JMSStatsImpl();
077
078 private IdGenerator clientIdGenerator;
079 private String clientIDPrefix;
080 private IdGenerator connectionIdGenerator;
081 private String connectionIDPrefix;
082
083 // client policies
084 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
085 private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
086 {
087 redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy());
088 }
089 private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
090 private MessageTransformer transformer;
091
092 private boolean disableTimeStampsByDefault;
093 private boolean optimizedMessageDispatch = true;
094 private long optimizeAcknowledgeTimeOut = 300;
095 private long optimizedAckScheduledAckInterval = 0;
096 private boolean copyMessageOnSend = true;
097 private boolean useCompression;
098 private boolean objectMessageSerializationDefered;
099 private boolean useAsyncSend;
100 private boolean optimizeAcknowledge;
101 private int closeTimeout = 15000;
102 private boolean useRetroactiveConsumer;
103 private boolean exclusiveConsumer;
104 private boolean nestedMapAndListEnabled = true;
105 private boolean alwaysSyncSend;
106 private boolean watchTopicAdvisories = true;
107 private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
108 private long warnAboutUnstartedConnectionTimeout = 500L;
109 private int sendTimeout = 0;
110 private boolean sendAcksAsync=true;
111 private TransportListener transportListener;
112 private ExceptionListener exceptionListener;
113 private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
114 private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
115 private boolean useDedicatedTaskRunner;
116 private long consumerFailoverRedeliveryWaitPeriod = 0;
117 private boolean checkForDuplicates = true;
118 private ClientInternalExceptionListener clientInternalExceptionListener;
119 private boolean messagePrioritySupported = true;
120 private boolean transactedIndividualAck = false;
121 private boolean nonBlockingRedelivery = false;
122 private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
123 private TaskRunnerFactory sessionTaskRunner;
124 private RejectedExecutionHandler rejectedTaskHandler = null;
125
126 // /////////////////////////////////////////////
127 //
128 // ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory Methods
129 //
130 // /////////////////////////////////////////////
131
132 public ActiveMQConnectionFactory() {
133 this(DEFAULT_BROKER_URL);
134 }
135
136 public ActiveMQConnectionFactory(String brokerURL) {
137 this(createURI(brokerURL));
138 }
139
140 public ActiveMQConnectionFactory(URI brokerURL) {
141 setBrokerURL(brokerURL.toString());
142 }
143
144 public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
145 setUserName(userName);
146 setPassword(password);
147 setBrokerURL(brokerURL.toString());
148 }
149
150 public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
151 setUserName(userName);
152 setPassword(password);
153 setBrokerURL(brokerURL);
154 }
155
156 /**
157 * Returns a copy of the given connection factory
158 */
159 public ActiveMQConnectionFactory copy() {
160 try {
161 return (ActiveMQConnectionFactory)super.clone();
162 } catch (CloneNotSupportedException e) {
163 throw new RuntimeException("This should never happen: " + e, e);
164 }
165 }
166
167 /**
168 * @param brokerURL
169 * @return
170 * @throws URISyntaxException
171 */
172 private static URI createURI(String brokerURL) {
173 try {
174 return new URI(brokerURL);
175 } catch (URISyntaxException e) {
176 throw (IllegalArgumentException)new IllegalArgumentException("Invalid broker URI: " + brokerURL).initCause(e);
177 }
178 }
179
180 /**
181 * @return Returns the Connection.
182 */
183 public Connection createConnection() throws JMSException {
184 return createActiveMQConnection();
185 }
186
187 /**
188 * @return Returns the Connection.
189 */
190 public Connection createConnection(String userName, String password) throws JMSException {
191 return createActiveMQConnection(userName, password);
192 }
193
194 /**
195 * @return Returns the QueueConnection.
196 * @throws JMSException
197 */
198 public QueueConnection createQueueConnection() throws JMSException {
199 return createActiveMQConnection().enforceQueueOnlyConnection();
200 }
201
202 /**
203 * @return Returns the QueueConnection.
204 */
205 public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
206 return createActiveMQConnection(userName, password).enforceQueueOnlyConnection();
207 }
208
209 /**
210 * @return Returns the TopicConnection.
211 * @throws JMSException
212 */
213 public TopicConnection createTopicConnection() throws JMSException {
214 return createActiveMQConnection();
215 }
216
217 /**
218 * @return Returns the TopicConnection.
219 */
220 public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
221 return createActiveMQConnection(userName, password);
222 }
223
224 /**
225 * @returns the StatsImpl associated with this ConnectionFactory.
226 */
227 public StatsImpl getStats() {
228 return this.factoryStats;
229 }
230
231 // /////////////////////////////////////////////
232 //
233 // Implementation methods.
234 //
235 // /////////////////////////////////////////////
236
237 protected ActiveMQConnection createActiveMQConnection() throws JMSException {
238 return createActiveMQConnection(userName, password);
239 }
240
241 /**
242 * Creates a Transport based on this object's connection settings. Separated
243 * from createActiveMQConnection to allow for subclasses to override.
244 *
245 * @return The newly created Transport.
246 * @throws JMSException If unable to create trasnport.
247 */
248 protected Transport createTransport() throws JMSException {
249 try {
250 return TransportFactory.connect(brokerURL);
251 } catch (Exception e) {
252 throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
253 }
254 }
255
256 /**
257 * @return Returns the Connection.
258 */
259 protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
260 if (brokerURL == null) {
261 throw new ConfigurationException("brokerURL not set.");
262 }
263 ActiveMQConnection connection = null;
264 try {
265 Transport transport = createTransport();
266 connection = createActiveMQConnection(transport, factoryStats);
267
268 connection.setUserName(userName);
269 connection.setPassword(password);
270
271 configureConnection(connection);
272
273 transport.start();
274
275 if (clientID != null) {
276 connection.setDefaultClientID(clientID);
277 }
278
279 return connection;
280 } catch (JMSException e) {
281 // Clean up!
282 try {
283 connection.close();
284 } catch (Throwable ignore) {
285 }
286 throw e;
287 } catch (Exception e) {
288 // Clean up!
289 try {
290 connection.close();
291 } catch (Throwable ignore) {
292 }
293 throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
294 }
295 }
296
297 protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
298 ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
299 getConnectionIdGenerator(), stats);
300 return connection;
301 }
302
303 protected void configureConnection(ActiveMQConnection connection) throws JMSException {
304 connection.setPrefetchPolicy(getPrefetchPolicy());
305 connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
306 connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
307 connection.setCopyMessageOnSend(isCopyMessageOnSend());
308 connection.setUseCompression(isUseCompression());
309 connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
310 connection.setDispatchAsync(isDispatchAsync());
311 connection.setUseAsyncSend(isUseAsyncSend());
312 connection.setAlwaysSyncSend(isAlwaysSyncSend());
313 connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
314 connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
315 connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
316 connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval());
317 connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
318 connection.setExclusiveConsumer(isExclusiveConsumer());
319 connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
320 connection.setTransformer(getTransformer());
321 connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
322 connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
323 connection.setProducerWindowSize(getProducerWindowSize());
324 connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
325 connection.setSendTimeout(getSendTimeout());
326 connection.setCloseTimeout(getCloseTimeout());
327 connection.setSendAcksAsync(isSendAcksAsync());
328 connection.setAuditDepth(getAuditDepth());
329 connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
330 connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
331 connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
332 connection.setCheckForDuplicates(isCheckForDuplicates());
333 connection.setMessagePrioritySupported(isMessagePrioritySupported());
334 connection.setTransactedIndividualAck(isTransactedIndividualAck());
335 connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
336 connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
337 connection.setSessionTaskRunner(getSessionTaskRunner());
338 connection.setRejectedTaskHandler(getRejectedTaskHandler());
339 connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
340 if (transportListener != null) {
341 connection.addTransportListener(transportListener);
342 }
343 if (exceptionListener != null) {
344 connection.setExceptionListener(exceptionListener);
345 }
346 if (clientInternalExceptionListener != null) {
347 connection.setClientInternalExceptionListener(clientInternalExceptionListener);
348 }
349 }
350
351 // /////////////////////////////////////////////
352 //
353 // Property Accessors
354 //
355 // /////////////////////////////////////////////
356
357 public String getBrokerURL() {
358 return brokerURL == null ? null : brokerURL.toString();
359 }
360
361 /**
362 * Sets the <a
363 * href="http://activemq.apache.org/configuring-transports.html">connection
364 * URL</a> used to connect to the ActiveMQ broker.
365 */
366 public void setBrokerURL(String brokerURL) {
367 this.brokerURL = createURI(brokerURL);
368
369 // Use all the properties prefixed with 'jms.' to set the connection
370 // factory
371 // options.
372 if (this.brokerURL.getQuery() != null) {
373 // It might be a standard URI or...
374 try {
375
376 Map<String,String> map = URISupport.parseQuery(this.brokerURL.getQuery());
377 Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(map, "jms.");
378 if (buildFromMap(jmsOptionsMap)) {
379 if (!jmsOptionsMap.isEmpty()) {
380 String msg = "There are " + jmsOptionsMap.size()
381 + " jms options that couldn't be set on the ConnectionFactory."
382 + " Check the options are spelled correctly."
383 + " Unknown parameters=[" + jmsOptionsMap + "]."
384 + " This connection factory cannot be started.";
385 throw new IllegalArgumentException(msg);
386 }
387
388 this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
389 }
390
391 } catch (URISyntaxException e) {
392 }
393
394 } else {
395
396 // It might be a composite URI.
397 try {
398 CompositeData data = URISupport.parseComposite(this.brokerURL);
399 Map<String,Object> jmsOptionsMap = IntrospectionSupport.extractProperties(data.getParameters(), "jms.");
400 if (buildFromMap(jmsOptionsMap)) {
401 if (!jmsOptionsMap.isEmpty()) {
402 String msg = "There are " + jmsOptionsMap.size()
403 + " jms options that couldn't be set on the ConnectionFactory."
404 + " Check the options are spelled correctly."
405 + " Unknown parameters=[" + jmsOptionsMap + "]."
406 + " This connection factory cannot be started.";
407 throw new IllegalArgumentException(msg);
408 }
409
410 this.brokerURL = data.toURI();
411 }
412 } catch (URISyntaxException e) {
413 }
414 }
415 }
416
417 public String getClientID() {
418 return clientID;
419 }
420
421 /**
422 * Sets the JMS clientID to use for the created connection. Note that this
423 * can only be used by one connection at once so generally its a better idea
424 * to set the clientID on a Connection
425 */
426 public void setClientID(String clientID) {
427 this.clientID = clientID;
428 }
429
430 public boolean isCopyMessageOnSend() {
431 return copyMessageOnSend;
432 }
433
434 /**
435 * Should a JMS message be copied to a new JMS Message object as part of the
436 * send() method in JMS. This is enabled by default to be compliant with the
437 * JMS specification. You can disable it if you do not mutate JMS messages
438 * after they are sent for a performance boost
439 */
440 public void setCopyMessageOnSend(boolean copyMessageOnSend) {
441 this.copyMessageOnSend = copyMessageOnSend;
442 }
443
444 public boolean isDisableTimeStampsByDefault() {
445 return disableTimeStampsByDefault;
446 }
447
448 /**
449 * Sets whether or not timestamps on messages should be disabled or not. If
450 * you disable them it adds a small performance boost.
451 */
452 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
453 this.disableTimeStampsByDefault = disableTimeStampsByDefault;
454 }
455
456 public boolean isOptimizedMessageDispatch() {
457 return optimizedMessageDispatch;
458 }
459
460 /**
461 * If this flag is set then an larger prefetch limit is used - only
462 * applicable for durable topic subscribers.
463 */
464 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
465 this.optimizedMessageDispatch = optimizedMessageDispatch;
466 }
467
468 public String getPassword() {
469 return password;
470 }
471
472 /**
473 * Sets the JMS password used for connections created from this factory
474 */
475 public void setPassword(String password) {
476 this.password = password;
477 }
478
479 public ActiveMQPrefetchPolicy getPrefetchPolicy() {
480 return prefetchPolicy;
481 }
482
483 /**
484 * Sets the <a
485 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
486 * policy</a> for consumers created by this connection.
487 */
488 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
489 this.prefetchPolicy = prefetchPolicy;
490 }
491
492 public boolean isUseAsyncSend() {
493 return useAsyncSend;
494 }
495
496 public BlobTransferPolicy getBlobTransferPolicy() {
497 return blobTransferPolicy;
498 }
499
500 /**
501 * Sets the policy used to describe how out-of-band BLOBs (Binary Large
502 * OBjects) are transferred from producers to brokers to consumers
503 */
504 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
505 this.blobTransferPolicy = blobTransferPolicy;
506 }
507
508 /**
509 * Forces the use of <a
510 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
511 * adds a massive performance boost; but means that the send() method will
512 * return immediately whether the message has been sent or not which could
513 * lead to message loss.
514 */
515 public void setUseAsyncSend(boolean useAsyncSend) {
516 this.useAsyncSend = useAsyncSend;
517 }
518
519 public synchronized boolean isWatchTopicAdvisories() {
520 return watchTopicAdvisories;
521 }
522
523 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
524 this.watchTopicAdvisories = watchTopicAdvisories;
525 }
526
527 /**
528 * @return true if always sync send messages
529 */
530 public boolean isAlwaysSyncSend() {
531 return this.alwaysSyncSend;
532 }
533
534 /**
535 * Set true if always require messages to be sync sent
536 *
537 * @param alwaysSyncSend
538 */
539 public void setAlwaysSyncSend(boolean alwaysSyncSend) {
540 this.alwaysSyncSend = alwaysSyncSend;
541 }
542
543 public String getUserName() {
544 return userName;
545 }
546
547 /**
548 * Sets the JMS userName used by connections created by this factory
549 */
550 public void setUserName(String userName) {
551 this.userName = userName;
552 }
553
554 public boolean isUseRetroactiveConsumer() {
555 return useRetroactiveConsumer;
556 }
557
558 /**
559 * Sets whether or not retroactive consumers are enabled. Retroactive
560 * consumers allow non-durable topic subscribers to receive old messages
561 * that were published before the non-durable subscriber started.
562 */
563 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
564 this.useRetroactiveConsumer = useRetroactiveConsumer;
565 }
566
567 public boolean isExclusiveConsumer() {
568 return exclusiveConsumer;
569 }
570
571 /**
572 * Enables or disables whether or not queue consumers should be exclusive or
573 * not for example to preserve ordering when not using <a
574 * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
575 *
576 * @param exclusiveConsumer
577 */
578 public void setExclusiveConsumer(boolean exclusiveConsumer) {
579 this.exclusiveConsumer = exclusiveConsumer;
580 }
581
582 public RedeliveryPolicy getRedeliveryPolicy() {
583 return redeliveryPolicyMap.getDefaultEntry();
584 }
585
586 /**
587 * Sets the global default redelivery policy to be used when a message is delivered
588 * but the session is rolled back
589 */
590 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
591 this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
592 }
593
594 public RedeliveryPolicyMap getRedeliveryPolicyMap() {
595 return this.redeliveryPolicyMap;
596 }
597
598 /**
599 * Sets the global redelivery policy mapping to be used when a message is delivered
600 * but the session is rolled back
601 */
602 public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
603 this.redeliveryPolicyMap = redeliveryPolicyMap;
604 }
605
606 public MessageTransformer getTransformer() {
607 return transformer;
608 }
609
610 /**
611 * @return the sendTimeout
612 */
613 public int getSendTimeout() {
614 return sendTimeout;
615 }
616
617 /**
618 * @param sendTimeout the sendTimeout to set
619 */
620 public void setSendTimeout(int sendTimeout) {
621 this.sendTimeout = sendTimeout;
622 }
623
624 /**
625 * @return the sendAcksAsync
626 */
627 public boolean isSendAcksAsync() {
628 return sendAcksAsync;
629 }
630
631 /**
632 * @param sendAcksAsync the sendAcksAsync to set
633 */
634 public void setSendAcksAsync(boolean sendAcksAsync) {
635 this.sendAcksAsync = sendAcksAsync;
636 }
637
638 /**
639 * @return the messagePrioritySupported
640 */
641 public boolean isMessagePrioritySupported() {
642 return this.messagePrioritySupported;
643 }
644
645 /**
646 * @param messagePrioritySupported the messagePrioritySupported to set
647 */
648 public void setMessagePrioritySupported(boolean messagePrioritySupported) {
649 this.messagePrioritySupported = messagePrioritySupported;
650 }
651
652
653 /**
654 * Sets the transformer used to transform messages before they are sent on
655 * to the JMS bus or when they are received from the bus but before they are
656 * delivered to the JMS client
657 */
658 public void setTransformer(MessageTransformer transformer) {
659 this.transformer = transformer;
660 }
661
662 @SuppressWarnings({ "unchecked", "rawtypes" })
663 @Override
664 public void buildFromProperties(Properties properties) {
665
666 if (properties == null) {
667 properties = new Properties();
668 }
669
670 String temp = properties.getProperty(Context.PROVIDER_URL);
671 if (temp == null || temp.length() == 0) {
672 temp = properties.getProperty("brokerURL");
673 }
674 if (temp != null && temp.length() > 0) {
675 setBrokerURL(temp);
676 }
677
678 Map<String, Object> p = new HashMap(properties);
679 buildFromMap(p);
680 }
681
682 public boolean buildFromMap(Map<String, Object> properties) {
683 boolean rc = false;
684
685 ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
686 if (IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.")) {
687 setPrefetchPolicy(p);
688 rc = true;
689 }
690
691 RedeliveryPolicy rp = new RedeliveryPolicy();
692 if (IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.")) {
693 setRedeliveryPolicy(rp);
694 rc = true;
695 }
696
697 BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
698 if (IntrospectionSupport.setProperties(blobTransferPolicy, properties, "blobTransferPolicy.")) {
699 setBlobTransferPolicy(blobTransferPolicy);
700 rc = true;
701 }
702
703 rc |= IntrospectionSupport.setProperties(this, properties);
704
705 return rc;
706 }
707
708 @Override
709 public void populateProperties(Properties props) {
710 props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
711
712 if (getBrokerURL() != null) {
713 props.setProperty(Context.PROVIDER_URL, getBrokerURL());
714 props.setProperty("brokerURL", getBrokerURL());
715 }
716
717 if (getClientID() != null) {
718 props.setProperty("clientID", getClientID());
719 }
720
721 IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
722 IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
723 IntrospectionSupport.getProperties(getBlobTransferPolicy(), props, "blobTransferPolicy.");
724
725 props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
726 props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
727 props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
728 props.setProperty("optimizedMessageDispatch", Boolean.toString(isOptimizedMessageDispatch()));
729
730 if (getPassword() != null) {
731 props.setProperty("password", getPassword());
732 }
733
734 props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
735 props.setProperty("useCompression", Boolean.toString(isUseCompression()));
736 props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
737 props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));
738
739 if (getUserName() != null) {
740 props.setProperty("userName", getUserName());
741 }
742
743 props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
744 props.setProperty("alwaysSessionAsync", Boolean.toString(isAlwaysSessionAsync()));
745 props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
746 props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
747 props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
748 props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
749 props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
750 props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
751 props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
752 props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
753 props.setProperty("checkForDuplicates", Boolean.toString(isCheckForDuplicates()));
754 props.setProperty("messagePrioritySupported", Boolean.toString(isMessagePrioritySupported()));
755 props.setProperty("transactedIndividualAck", Boolean.toString(isTransactedIndividualAck()));
756 props.setProperty("nonBlockingRedelivery", Boolean.toString(isNonBlockingRedelivery()));
757 props.setProperty("maxThreadPoolSize", Integer.toString(getMaxThreadPoolSize()));
758 props.setProperty("nestedMapAndListEnabled", Boolean.toString(isNestedMapAndListEnabled()));
759 }
760
761 public boolean isUseCompression() {
762 return useCompression;
763 }
764
765 /**
766 * Enables the use of compression of the message bodies
767 */
768 public void setUseCompression(boolean useCompression) {
769 this.useCompression = useCompression;
770 }
771
772 public boolean isObjectMessageSerializationDefered() {
773 return objectMessageSerializationDefered;
774 }
775
776 /**
777 * When an object is set on an ObjectMessage, the JMS spec requires the
778 * object to be serialized by that set method. Enabling this flag causes the
779 * object to not get serialized. The object may subsequently get serialized
780 * if the message needs to be sent over a socket or stored to disk.
781 */
782 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
783 this.objectMessageSerializationDefered = objectMessageSerializationDefered;
784 }
785
786 public boolean isDispatchAsync() {
787 return dispatchAsync;
788 }
789
790 /**
791 * Enables or disables the default setting of whether or not consumers have
792 * their messages <a
793 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
794 * synchronously or asynchronously by the broker</a>. For non-durable
795 * topics for example we typically dispatch synchronously by default to
796 * minimize context switches which boost performance. However sometimes its
797 * better to go slower to ensure that a single blocked consumer socket does
798 * not block delivery to other consumers.
799 *
800 * @param asyncDispatch If true then consumers created on this connection
801 * will default to having their messages dispatched
802 * asynchronously. The default value is true.
803 */
804 public void setDispatchAsync(boolean asyncDispatch) {
805 this.dispatchAsync = asyncDispatch;
806 }
807
808 /**
809 * @return Returns the closeTimeout.
810 */
811 public int getCloseTimeout() {
812 return closeTimeout;
813 }
814
815 /**
816 * Sets the timeout before a close is considered complete. Normally a
817 * close() on a connection waits for confirmation from the broker; this
818 * allows that operation to timeout to save the client hanging if there is
819 * no broker
820 */
821 public void setCloseTimeout(int closeTimeout) {
822 this.closeTimeout = closeTimeout;
823 }
824
825 /**
826 * @return Returns the alwaysSessionAsync.
827 */
828 public boolean isAlwaysSessionAsync() {
829 return alwaysSessionAsync;
830 }
831
832 /**
833 * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
834 * the Connection. However, a separate thread is always used if there is more than one session, or the session
835 * isn't in auto acknowledge or duplicates ok mode. By default this value is set to true and session dispatch
836 * happens asynchronously.
837 */
838 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
839 this.alwaysSessionAsync = alwaysSessionAsync;
840 }
841
842 /**
843 * @return Returns the optimizeAcknowledge.
844 */
845 public boolean isOptimizeAcknowledge() {
846 return optimizeAcknowledge;
847 }
848
849 /**
850 * @param optimizeAcknowledge The optimizeAcknowledge to set.
851 */
852 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
853 this.optimizeAcknowledge = optimizeAcknowledge;
854 }
855
856 /**
857 * The max time in milliseconds between optimized ack batches
858 * @param optimizeAcknowledgeTimeOut
859 */
860 public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
861 this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut;
862 }
863
864 public long getOptimizeAcknowledgeTimeOut() {
865 return optimizeAcknowledgeTimeOut;
866 }
867
868 public boolean isNestedMapAndListEnabled() {
869 return nestedMapAndListEnabled;
870 }
871
872 /**
873 * Enables/disables whether or not Message properties and MapMessage entries
874 * support <a
875 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
876 * Structures</a> of Map and List objects
877 */
878 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
879 this.nestedMapAndListEnabled = structuredMapsEnabled;
880 }
881
882 public String getClientIDPrefix() {
883 return clientIDPrefix;
884 }
885
886 /**
887 * Sets the prefix used by autogenerated JMS Client ID values which are used
888 * if the JMS client does not explicitly specify on.
889 *
890 * @param clientIDPrefix
891 */
892 public void setClientIDPrefix(String clientIDPrefix) {
893 this.clientIDPrefix = clientIDPrefix;
894 }
895
896 protected synchronized IdGenerator getClientIdGenerator() {
897 if (clientIdGenerator == null) {
898 if (clientIDPrefix != null) {
899 clientIdGenerator = new IdGenerator(clientIDPrefix);
900 } else {
901 clientIdGenerator = new IdGenerator();
902 }
903 }
904 return clientIdGenerator;
905 }
906
907 protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
908 this.clientIdGenerator = clientIdGenerator;
909 }
910
911 /**
912 * Sets the prefix used by connection id generator
913 * @param connectionIDPrefix
914 */
915 public void setConnectionIDPrefix(String connectionIDPrefix) {
916 this.connectionIDPrefix = connectionIDPrefix;
917 }
918
919 protected synchronized IdGenerator getConnectionIdGenerator() {
920 if (connectionIdGenerator == null) {
921 if (connectionIDPrefix != null) {
922 connectionIdGenerator = new IdGenerator(connectionIDPrefix);
923 } else {
924 connectionIdGenerator = new IdGenerator();
925 }
926 }
927 return connectionIdGenerator;
928 }
929
930 protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
931 this.connectionIdGenerator = connectionIdGenerator;
932 }
933
934 /**
935 * @return the statsEnabled
936 */
937 public boolean isStatsEnabled() {
938 return this.factoryStats.isEnabled();
939 }
940
941 /**
942 * @param statsEnabled the statsEnabled to set
943 */
944 public void setStatsEnabled(boolean statsEnabled) {
945 this.factoryStats.setEnabled(statsEnabled);
946 }
947
948 public synchronized int getProducerWindowSize() {
949 return producerWindowSize;
950 }
951
952 public synchronized void setProducerWindowSize(int producerWindowSize) {
953 this.producerWindowSize = producerWindowSize;
954 }
955
956 public long getWarnAboutUnstartedConnectionTimeout() {
957 return warnAboutUnstartedConnectionTimeout;
958 }
959
960 /**
961 * Enables the timeout from a connection creation to when a warning is
962 * generated if the connection is not properly started via
963 * {@link Connection#start()} and a message is received by a consumer. It is
964 * a very common gotcha to forget to <a
965 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
966 * the connection</a> so this option makes the default case to create a
967 * warning if the user forgets. To disable the warning just set the value to <
968 * 0 (say -1).
969 */
970 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
971 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
972 }
973
974 public TransportListener getTransportListener() {
975 return transportListener;
976 }
977
978 /**
979 * Allows a listener to be configured on the ConnectionFactory so that when this factory is used
980 * with frameworks which don't expose the Connection such as Spring JmsTemplate, you can still register
981 * a transport listener.
982 *
983 * @param transportListener sets the listener to be registered on all connections
984 * created by this factory
985 */
986 public void setTransportListener(TransportListener transportListener) {
987 this.transportListener = transportListener;
988 }
989
990
991 public ExceptionListener getExceptionListener() {
992 return exceptionListener;
993 }
994
995 /**
996 * Allows an {@link ExceptionListener} to be configured on the ConnectionFactory so that when this factory
997 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
998 * an exception listener.
999 * <p> Note: access to this exceptionLinstener will <b>not</b> be serialized if it is associated with more than
1000 * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1001 * @param exceptionListener sets the exception listener to be registered on all connections
1002 * created by this factory
1003 */
1004 public void setExceptionListener(ExceptionListener exceptionListener) {
1005 this.exceptionListener = exceptionListener;
1006 }
1007
1008 public int getAuditDepth() {
1009 return auditDepth;
1010 }
1011
1012 public void setAuditDepth(int auditDepth) {
1013 this.auditDepth = auditDepth;
1014 }
1015
1016 public int getAuditMaximumProducerNumber() {
1017 return auditMaximumProducerNumber;
1018 }
1019
1020 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
1021 this.auditMaximumProducerNumber = auditMaximumProducerNumber;
1022 }
1023
1024 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1025 this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1026 }
1027
1028 public boolean isUseDedicatedTaskRunner() {
1029 return useDedicatedTaskRunner;
1030 }
1031
1032 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
1033 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
1034 }
1035
1036 public long getConsumerFailoverRedeliveryWaitPeriod() {
1037 return consumerFailoverRedeliveryWaitPeriod;
1038 }
1039
1040 public ClientInternalExceptionListener getClientInternalExceptionListener() {
1041 return clientInternalExceptionListener;
1042 }
1043
1044 /**
1045 * Allows an {@link ClientInternalExceptionListener} to be configured on the ConnectionFactory so that when this factory
1046 * is used by frameworks which don't expose the Connection such as Spring JmsTemplate, you can register
1047 * an exception listener.
1048 * <p> Note: access to this clientInternalExceptionListener will <b>not</b> be serialized if it is associated with more than
1049 * on connection (as it will be if more than one connection is subsequently created by this connection factory)
1050 * @param clientInternalExceptionListener sets the exception listener to be registered on all connections
1051 * created by this factory
1052 */
1053 public void setClientInternalExceptionListener(
1054 ClientInternalExceptionListener clientInternalExceptionListener) {
1055 this.clientInternalExceptionListener = clientInternalExceptionListener;
1056 }
1057
1058 /**
1059 * @return the checkForDuplicates
1060 */
1061 public boolean isCheckForDuplicates() {
1062 return this.checkForDuplicates;
1063 }
1064
1065 /**
1066 * @param checkForDuplicates the checkForDuplicates to set
1067 */
1068 public void setCheckForDuplicates(boolean checkForDuplicates) {
1069 this.checkForDuplicates = checkForDuplicates;
1070 }
1071
1072 public boolean isTransactedIndividualAck() {
1073 return transactedIndividualAck;
1074 }
1075
1076 /**
1077 * when true, submit individual transacted acks immediately rather than with transaction completion.
1078 * This allows the acks to represent delivery status which can be persisted on rollback
1079 * Used in conjunction with org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter#setRewriteOnRedelivery(boolean) true
1080 */
1081 public void setTransactedIndividualAck(boolean transactedIndividualAck) {
1082 this.transactedIndividualAck = transactedIndividualAck;
1083 }
1084
1085
1086 public boolean isNonBlockingRedelivery() {
1087 return nonBlockingRedelivery;
1088 }
1089
1090 /**
1091 * When true a MessageConsumer will not stop Message delivery before re-delivering Messages
1092 * from a rolled back transaction. This implies that message order will not be preserved and
1093 * also will result in the TransactedIndividualAck option to be enabled.
1094 */
1095 public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
1096 this.nonBlockingRedelivery = nonBlockingRedelivery;
1097 }
1098
1099 public int getMaxThreadPoolSize() {
1100 return maxThreadPoolSize;
1101 }
1102
1103 public void setMaxThreadPoolSize(int maxThreadPoolSize) {
1104 this.maxThreadPoolSize = maxThreadPoolSize;
1105 }
1106
1107 public TaskRunnerFactory getSessionTaskRunner() {
1108 return sessionTaskRunner;
1109 }
1110
1111 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1112 this.sessionTaskRunner = sessionTaskRunner;
1113 }
1114
1115 public RejectedExecutionHandler getRejectedTaskHandler() {
1116 return rejectedTaskHandler;
1117 }
1118
1119 public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
1120 this.rejectedTaskHandler = rejectedTaskHandler;
1121 }
1122
1123 /**
1124 * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
1125 * to send an ack for any outstanding Message Acks. By default this value is set to zero meaning that the consumers
1126 * will not do any background Message acknowledgment.
1127 *
1128 * @return the scheduledOptimizedAckInterval
1129 */
1130 public long getOptimizedAckScheduledAckInterval() {
1131 return optimizedAckScheduledAckInterval;
1132 }
1133
1134 /**
1135 * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
1136 * have been configured with optimizeAcknowledge enabled.
1137 *
1138 * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
1139 */
1140 public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
1141 this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
1142 }
1143 }