001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.command;
018
019 import java.util.ArrayList;
020 import java.util.List;
021
022 import org.apache.activemq.filter.BooleanExpression;
023 import org.apache.activemq.state.CommandVisitor;
024
025 /**
026 * @openwire:marshaller code="5"
027 *
028 */
029 public class ConsumerInfo extends BaseCommand {
030
031 public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO;
032
033 public static final byte HIGH_PRIORITY = 10;
034 public static final byte NORMAL_PRIORITY = 0;
035 public static final byte NETWORK_CONSUMER_PRIORITY = -5;
036 public static final byte LOW_PRIORITY = -10;
037
038 protected ConsumerId consumerId;
039 protected ActiveMQDestination destination;
040 protected int prefetchSize;
041 protected int maximumPendingMessageLimit;
042 protected boolean browser;
043 protected boolean dispatchAsync;
044 protected String selector;
045 protected String subscriptionName;
046 protected boolean noLocal;
047 protected boolean exclusive;
048 protected boolean retroactive;
049 protected byte priority;
050 protected BrokerId[] brokerPath;
051 protected boolean optimizedAcknowledge;
052 // used by the broker
053 protected transient int currentPrefetchSize;
054 // if true, the consumer will not send range
055 protected boolean noRangeAcks;
056 // acks.
057
058 protected BooleanExpression additionalPredicate;
059 protected transient boolean networkSubscription; // this subscription
060 protected transient List<ConsumerId> networkConsumerIds; // the original consumerId
061
062 // not marshalled, populated from RemoveInfo, the last message delivered, used
063 // to suppress redelivery on prefetched messages after close
064 // overload; also used at runtime to track assignment of message groups
065 private transient long lastDeliveredSequenceId;
066
067 // originated from a
068 // network connection
069
070 public ConsumerInfo() {
071 }
072
073 public ConsumerInfo(ConsumerId consumerId) {
074 this.consumerId = consumerId;
075 }
076
077 public ConsumerInfo(SessionInfo sessionInfo, long consumerId) {
078 this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId);
079 }
080
081 public ConsumerInfo copy() {
082 ConsumerInfo info = new ConsumerInfo();
083 copy(info);
084 return info;
085 }
086
087 public void copy(ConsumerInfo info) {
088 super.copy(info);
089 info.consumerId = consumerId;
090 info.destination = destination;
091 info.prefetchSize = prefetchSize;
092 info.maximumPendingMessageLimit = maximumPendingMessageLimit;
093 info.browser = browser;
094 info.dispatchAsync = dispatchAsync;
095 info.selector = selector;
096 info.subscriptionName = subscriptionName;
097 info.noLocal = noLocal;
098 info.exclusive = exclusive;
099 info.retroactive = retroactive;
100 info.priority = priority;
101 info.brokerPath = brokerPath;
102 info.networkSubscription = networkSubscription;
103 if (networkConsumerIds != null) {
104 if (info.networkConsumerIds==null){
105 info.networkConsumerIds=new ArrayList<ConsumerId>();
106 }
107 info.networkConsumerIds.addAll(networkConsumerIds);
108 }
109 }
110
111 public boolean isDurable() {
112 return subscriptionName != null;
113 }
114
115 public byte getDataStructureType() {
116 return DATA_STRUCTURE_TYPE;
117 }
118
119 /**
120 * Is used to uniquely identify the consumer to the broker.
121 *
122 * @openwire:property version=1 cache=true
123 */
124 public ConsumerId getConsumerId() {
125 return consumerId;
126 }
127
128 public void setConsumerId(ConsumerId consumerId) {
129 this.consumerId = consumerId;
130 }
131
132 /**
133 * Is this consumer a queue browser?
134 *
135 * @openwire:property version=1
136 */
137 public boolean isBrowser() {
138 return browser;
139 }
140
141 public void setBrowser(boolean browser) {
142 this.browser = browser;
143 }
144
145 /**
146 * The destination that the consumer is interested in receiving messages
147 * from. This destination could be a composite destination.
148 *
149 * @openwire:property version=1 cache=true
150 */
151 public ActiveMQDestination getDestination() {
152 return destination;
153 }
154
155 public void setDestination(ActiveMQDestination destination) {
156 this.destination = destination;
157 }
158
159 /**
160 * How many messages a broker will send to the client without receiving an
161 * ack before he stops dispatching messages to the client.
162 *
163 * @openwire:property version=1
164 */
165 public int getPrefetchSize() {
166 return prefetchSize;
167 }
168
169 public void setPrefetchSize(int prefetchSize) {
170 this.prefetchSize = prefetchSize;
171 this.currentPrefetchSize = prefetchSize;
172 }
173
174 /**
175 * How many messages a broker will keep around, above the prefetch limit,
176 * for non-durable topics before starting to discard older messages.
177 *
178 * @openwire:property version=1
179 */
180 public int getMaximumPendingMessageLimit() {
181 return maximumPendingMessageLimit;
182 }
183
184 public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
185 this.maximumPendingMessageLimit = maximumPendingMessageLimit;
186 }
187
188 /**
189 * Should the broker dispatch a message to the consumer async? If he does it
190 * async, then he uses a more SEDA style of processing while if it is not
191 * done async, then he broker use a STP style of processing. STP is more
192 * appropriate in high bandwidth situations or when being used by and in vm
193 * transport.
194 *
195 * @openwire:property version=1
196 */
197 public boolean isDispatchAsync() {
198 return dispatchAsync;
199 }
200
201 public void setDispatchAsync(boolean dispatchAsync) {
202 this.dispatchAsync = dispatchAsync;
203 }
204
205 /**
206 * The JMS selector used to filter out messages that this consumer is
207 * interested in.
208 *
209 * @openwire:property version=1
210 */
211 public String getSelector() {
212 return selector;
213 }
214
215 public void setSelector(String selector) {
216 this.selector = selector;
217 }
218
219 /**
220 * Used to identify the name of a durable subscription.
221 *
222 * @openwire:property version=1
223 */
224 public String getSubscriptionName() {
225 return subscriptionName;
226 }
227
228 public void setSubscriptionName(String durableSubscriptionId) {
229 this.subscriptionName = durableSubscriptionId;
230 }
231
232 /**
233 * Set noLocal to true to avoid receiving messages that were published
234 * locally on the same connection.
235 *
236 * @openwire:property version=1
237 */
238 public boolean isNoLocal() {
239 return noLocal;
240 }
241
242 public void setNoLocal(boolean noLocal) {
243 this.noLocal = noLocal;
244 }
245
246 /**
247 * An exclusive consumer locks out other consumers from being able to
248 * receive messages from the destination. If there are multiple exclusive
249 * consumers for a destination, the first one created will be the exclusive
250 * consumer of the destination.
251 *
252 * @openwire:property version=1
253 */
254 public boolean isExclusive() {
255 return exclusive;
256 }
257
258 public void setExclusive(boolean exclusive) {
259 this.exclusive = exclusive;
260 }
261
262 /**
263 * A retroactive consumer only has meaning for Topics. It allows a consumer
264 * to retroactively see messages sent prior to the consumer being created.
265 * If the consumer is not durable, it will be delivered the last message
266 * published to the topic. If the consumer is durable then it will receive
267 * all persistent messages that are still stored in persistent storage for
268 * that topic.
269 *
270 * @openwire:property version=1
271 */
272 public boolean isRetroactive() {
273 return retroactive;
274 }
275
276 public void setRetroactive(boolean retroactive) {
277 this.retroactive = retroactive;
278 }
279
280 public RemoveInfo createRemoveCommand() {
281 RemoveInfo command = new RemoveInfo(getConsumerId());
282 command.setResponseRequired(isResponseRequired());
283 return command;
284 }
285
286 /**
287 * The broker will avoid dispatching to a lower priority consumer if there
288 * are other higher priority consumers available to dispatch to. This allows
289 * letting the broker to have an affinity to higher priority consumers.
290 * Default priority is 0.
291 *
292 * @openwire:property version=1
293 */
294 public byte getPriority() {
295 return priority;
296 }
297
298 public void setPriority(byte priority) {
299 this.priority = priority;
300 }
301
302 /**
303 * The route of brokers the command has moved through.
304 *
305 * @openwire:property version=1 cache=true
306 */
307 public BrokerId[] getBrokerPath() {
308 return brokerPath;
309 }
310
311 public void setBrokerPath(BrokerId[] brokerPath) {
312 this.brokerPath = brokerPath;
313 }
314
315 /**
316 * A transient additional predicate that can be used it inject additional
317 * predicates into the selector on the fly. Handy if if say a Security
318 * Broker interceptor wants to filter out messages based on security level
319 * of the consumer.
320 *
321 * @openwire:property version=1
322 */
323 public BooleanExpression getAdditionalPredicate() {
324 return additionalPredicate;
325 }
326
327 public void setAdditionalPredicate(BooleanExpression additionalPredicate) {
328 this.additionalPredicate = additionalPredicate;
329 }
330
331 public Response visit(CommandVisitor visitor) throws Exception {
332 return visitor.processAddConsumer(this);
333 }
334
335 /**
336 * @openwire:property version=1
337 * @return Returns the networkSubscription.
338 */
339 public boolean isNetworkSubscription() {
340 return networkSubscription;
341 }
342
343 /**
344 * @param networkSubscription The networkSubscription to set.
345 */
346 public void setNetworkSubscription(boolean networkSubscription) {
347 this.networkSubscription = networkSubscription;
348 }
349
350 /**
351 * @openwire:property version=1
352 * @return Returns the optimizedAcknowledge.
353 */
354 public boolean isOptimizedAcknowledge() {
355 return optimizedAcknowledge;
356 }
357
358 /**
359 * @param optimizedAcknowledge The optimizedAcknowledge to set.
360 */
361 public void setOptimizedAcknowledge(boolean optimizedAcknowledge) {
362 this.optimizedAcknowledge = optimizedAcknowledge;
363 }
364
365 /**
366 * @return Returns the currentPrefetchSize.
367 */
368 public int getCurrentPrefetchSize() {
369 return currentPrefetchSize;
370 }
371
372 /**
373 * @param currentPrefetchSize The currentPrefetchSize to set.
374 */
375 public void setCurrentPrefetchSize(int currentPrefetchSize) {
376 this.currentPrefetchSize = currentPrefetchSize;
377 }
378
379 /**
380 * The broker may be able to optimize it's processing or provides better QOS
381 * if it knows the consumer will not be sending ranged acks.
382 *
383 * @return true if the consumer will not send range acks.
384 * @openwire:property version=1
385 */
386 public boolean isNoRangeAcks() {
387 return noRangeAcks;
388 }
389
390 public void setNoRangeAcks(boolean noRangeAcks) {
391 this.noRangeAcks = noRangeAcks;
392 }
393
394 public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) {
395 if (networkConsumerIds == null) {
396 networkConsumerIds = new ArrayList<ConsumerId>();
397 }
398 networkConsumerIds.add(networkConsumerId);
399 }
400
401 public synchronized void removeNetworkConsumerId(ConsumerId networkConsumerId) {
402 if (networkConsumerIds != null) {
403 networkConsumerIds.remove(networkConsumerId);
404 if (networkConsumerIds.isEmpty()) {
405 networkConsumerIds=null;
406 }
407 }
408 }
409
410 public synchronized boolean isNetworkConsumersEmpty() {
411 return networkConsumerIds == null || networkConsumerIds.isEmpty();
412 }
413
414 public synchronized List<ConsumerId> getNetworkConsumerIds(){
415 List<ConsumerId> result = new ArrayList<ConsumerId>();
416 if (networkConsumerIds != null) {
417 result.addAll(networkConsumerIds);
418 }
419 return result;
420 }
421
422 /**
423 * Tracks the original subscription id that causes a subscription to
424 * percolate through a network when networkTTL > 1. Tracking the original
425 * subscription allows duplicate suppression.
426 *
427 * @return array of the current subscription path
428 * @openwire:property version=4
429 */
430 public ConsumerId[] getNetworkConsumerPath() {
431 ConsumerId[] result = null;
432 if (networkConsumerIds != null) {
433 result = networkConsumerIds.toArray(new ConsumerId[0]);
434 }
435 return result;
436 }
437
438 public void setNetworkConsumerPath(ConsumerId[] consumerPath) {
439 if (consumerPath != null) {
440 for (int i=0; i<consumerPath.length; i++) {
441 addNetworkConsumerId(consumerPath[i]);
442 }
443 }
444 }
445
446 public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
447 this.lastDeliveredSequenceId = lastDeliveredSequenceId;
448 }
449
450 public long getLastDeliveredSequenceId() {
451 return lastDeliveredSequenceId;
452 }
453
454 }