001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    
022    import org.apache.activemq.filter.BooleanExpression;
023    import org.apache.activemq.state.CommandVisitor;
024    
025    /**
026     * @openwire:marshaller code="5"
027     *
028     */
029    public class ConsumerInfo extends BaseCommand {
030    
031        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO;
032    
033        public static final byte HIGH_PRIORITY = 10;
034        public static final byte NORMAL_PRIORITY = 0;
035        public static final byte NETWORK_CONSUMER_PRIORITY = -5;
036        public static final byte LOW_PRIORITY = -10;
037    
038        protected ConsumerId consumerId;
039        protected ActiveMQDestination destination;
040        protected int prefetchSize;
041        protected int maximumPendingMessageLimit;
042        protected boolean browser;
043        protected boolean dispatchAsync;
044        protected String selector;
045        protected String subscriptionName;
046        protected boolean noLocal;
047        protected boolean exclusive;
048        protected boolean retroactive;
049        protected byte priority;
050        protected BrokerId[] brokerPath;
051        protected boolean optimizedAcknowledge;
052        // used by the broker
053        protected transient int currentPrefetchSize;
054        // if true, the consumer will not send range
055        protected boolean noRangeAcks;
056        // acks.
057    
058        protected BooleanExpression additionalPredicate;
059        protected transient boolean networkSubscription; // this subscription
060        protected transient List<ConsumerId> networkConsumerIds; // the original consumerId
061    
062        // not marshalled, populated from RemoveInfo, the last message delivered, used
063        // to suppress redelivery on prefetched messages after close
064        // overload; also used at runtime to track assignment of message groups
065        private transient long lastDeliveredSequenceId;
066    
067        // originated from a
068        // network connection
069    
070        public ConsumerInfo() {
071        }
072    
073        public ConsumerInfo(ConsumerId consumerId) {
074            this.consumerId = consumerId;
075        }
076    
077        public ConsumerInfo(SessionInfo sessionInfo, long consumerId) {
078            this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId);
079        }
080    
081        public ConsumerInfo copy() {
082            ConsumerInfo info = new ConsumerInfo();
083            copy(info);
084            return info;
085        }
086    
087        public void copy(ConsumerInfo info) {
088            super.copy(info);
089            info.consumerId = consumerId;
090            info.destination = destination;
091            info.prefetchSize = prefetchSize;
092            info.maximumPendingMessageLimit = maximumPendingMessageLimit;
093            info.browser = browser;
094            info.dispatchAsync = dispatchAsync;
095            info.selector = selector;
096            info.subscriptionName = subscriptionName;
097            info.noLocal = noLocal;
098            info.exclusive = exclusive;
099            info.retroactive = retroactive;
100            info.priority = priority;
101            info.brokerPath = brokerPath;
102            info.networkSubscription = networkSubscription;
103            if (networkConsumerIds != null) {
104                if (info.networkConsumerIds==null){
105                    info.networkConsumerIds=new ArrayList<ConsumerId>();
106                }
107                info.networkConsumerIds.addAll(networkConsumerIds);
108            }
109        }
110    
111        public boolean isDurable() {
112            return subscriptionName != null;
113        }
114    
115        public byte getDataStructureType() {
116            return DATA_STRUCTURE_TYPE;
117        }
118    
119        /**
120         * Is used to uniquely identify the consumer to the broker.
121         *
122         * @openwire:property version=1 cache=true
123         */
124        public ConsumerId getConsumerId() {
125            return consumerId;
126        }
127    
128        public void setConsumerId(ConsumerId consumerId) {
129            this.consumerId = consumerId;
130        }
131    
132        /**
133         * Is this consumer a queue browser?
134         *
135         * @openwire:property version=1
136         */
137        public boolean isBrowser() {
138            return browser;
139        }
140    
141        public void setBrowser(boolean browser) {
142            this.browser = browser;
143        }
144    
145        /**
146         * The destination that the consumer is interested in receiving messages
147         * from. This destination could be a composite destination.
148         *
149         * @openwire:property version=1 cache=true
150         */
151        public ActiveMQDestination getDestination() {
152            return destination;
153        }
154    
155        public void setDestination(ActiveMQDestination destination) {
156            this.destination = destination;
157        }
158    
159        /**
160         * How many messages a broker will send to the client without receiving an
161         * ack before he stops dispatching messages to the client.
162         *
163         * @openwire:property version=1
164         */
165        public int getPrefetchSize() {
166            return prefetchSize;
167        }
168    
169        public void setPrefetchSize(int prefetchSize) {
170            this.prefetchSize = prefetchSize;
171            this.currentPrefetchSize = prefetchSize;
172        }
173    
174        /**
175         * How many messages a broker will keep around, above the prefetch limit,
176         * for non-durable topics before starting to discard older messages.
177         *
178         * @openwire:property version=1
179         */
180        public int getMaximumPendingMessageLimit() {
181            return maximumPendingMessageLimit;
182        }
183    
184        public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
185            this.maximumPendingMessageLimit = maximumPendingMessageLimit;
186        }
187    
188        /**
189         * Should the broker dispatch a message to the consumer async? If he does it
190         * async, then he uses a more SEDA style of processing while if it is not
191         * done async, then he broker use a STP style of processing. STP is more
192         * appropriate in high bandwidth situations or when being used by and in vm
193         * transport.
194         *
195         * @openwire:property version=1
196         */
197        public boolean isDispatchAsync() {
198            return dispatchAsync;
199        }
200    
201        public void setDispatchAsync(boolean dispatchAsync) {
202            this.dispatchAsync = dispatchAsync;
203        }
204    
205        /**
206         * The JMS selector used to filter out messages that this consumer is
207         * interested in.
208         *
209         * @openwire:property version=1
210         */
211        public String getSelector() {
212            return selector;
213        }
214    
215        public void setSelector(String selector) {
216            this.selector = selector;
217        }
218    
219        /**
220         * Used to identify the name of a durable subscription.
221         *
222         * @openwire:property version=1
223         */
224        public String getSubscriptionName() {
225            return subscriptionName;
226        }
227    
228        public void setSubscriptionName(String durableSubscriptionId) {
229            this.subscriptionName = durableSubscriptionId;
230        }
231    
232        /**
233         * Set noLocal to true to avoid receiving messages that were published
234         * locally on the same connection.
235         *
236         * @openwire:property version=1
237         */
238        public boolean isNoLocal() {
239            return noLocal;
240        }
241    
242        public void setNoLocal(boolean noLocal) {
243            this.noLocal = noLocal;
244        }
245    
246        /**
247         * An exclusive consumer locks out other consumers from being able to
248         * receive messages from the destination. If there are multiple exclusive
249         * consumers for a destination, the first one created will be the exclusive
250         * consumer of the destination.
251         *
252         * @openwire:property version=1
253         */
254        public boolean isExclusive() {
255            return exclusive;
256        }
257    
258        public void setExclusive(boolean exclusive) {
259            this.exclusive = exclusive;
260        }
261    
262        /**
263         * A retroactive consumer only has meaning for Topics. It allows a consumer
264         * to retroactively see messages sent prior to the consumer being created.
265         * If the consumer is not durable, it will be delivered the last message
266         * published to the topic. If the consumer is durable then it will receive
267         * all persistent messages that are still stored in persistent storage for
268         * that topic.
269         *
270         * @openwire:property version=1
271         */
272        public boolean isRetroactive() {
273            return retroactive;
274        }
275    
276        public void setRetroactive(boolean retroactive) {
277            this.retroactive = retroactive;
278        }
279    
280        public RemoveInfo createRemoveCommand() {
281            RemoveInfo command = new RemoveInfo(getConsumerId());
282            command.setResponseRequired(isResponseRequired());
283            return command;
284        }
285    
286        /**
287         * The broker will avoid dispatching to a lower priority consumer if there
288         * are other higher priority consumers available to dispatch to. This allows
289         * letting the broker to have an affinity to higher priority consumers.
290         * Default priority is 0.
291         *
292         * @openwire:property version=1
293         */
294        public byte getPriority() {
295            return priority;
296        }
297    
298        public void setPriority(byte priority) {
299            this.priority = priority;
300        }
301    
302        /**
303         * The route of brokers the command has moved through.
304         *
305         * @openwire:property version=1 cache=true
306         */
307        public BrokerId[] getBrokerPath() {
308            return brokerPath;
309        }
310    
311        public void setBrokerPath(BrokerId[] brokerPath) {
312            this.brokerPath = brokerPath;
313        }
314    
315        /**
316         * A transient additional predicate that can be used it inject additional
317         * predicates into the selector on the fly. Handy if if say a Security
318         * Broker interceptor wants to filter out messages based on security level
319         * of the consumer.
320         *
321         * @openwire:property version=1
322         */
323        public BooleanExpression getAdditionalPredicate() {
324            return additionalPredicate;
325        }
326    
327        public void setAdditionalPredicate(BooleanExpression additionalPredicate) {
328            this.additionalPredicate = additionalPredicate;
329        }
330    
331        public Response visit(CommandVisitor visitor) throws Exception {
332            return visitor.processAddConsumer(this);
333        }
334    
335        /**
336         * @openwire:property version=1
337         * @return Returns the networkSubscription.
338         */
339        public boolean isNetworkSubscription() {
340            return networkSubscription;
341        }
342    
343        /**
344         * @param networkSubscription The networkSubscription to set.
345         */
346        public void setNetworkSubscription(boolean networkSubscription) {
347            this.networkSubscription = networkSubscription;
348        }
349    
350        /**
351         * @openwire:property version=1
352         * @return Returns the optimizedAcknowledge.
353         */
354        public boolean isOptimizedAcknowledge() {
355            return optimizedAcknowledge;
356        }
357    
358        /**
359         * @param optimizedAcknowledge The optimizedAcknowledge to set.
360         */
361        public void setOptimizedAcknowledge(boolean optimizedAcknowledge) {
362            this.optimizedAcknowledge = optimizedAcknowledge;
363        }
364    
365        /**
366         * @return Returns the currentPrefetchSize.
367         */
368        public int getCurrentPrefetchSize() {
369            return currentPrefetchSize;
370        }
371    
372        /**
373         * @param currentPrefetchSize The currentPrefetchSize to set.
374         */
375        public void setCurrentPrefetchSize(int currentPrefetchSize) {
376            this.currentPrefetchSize = currentPrefetchSize;
377        }
378    
379        /**
380         * The broker may be able to optimize it's processing or provides better QOS
381         * if it knows the consumer will not be sending ranged acks.
382         *
383         * @return true if the consumer will not send range acks.
384         * @openwire:property version=1
385         */
386        public boolean isNoRangeAcks() {
387            return noRangeAcks;
388        }
389    
390        public void setNoRangeAcks(boolean noRangeAcks) {
391            this.noRangeAcks = noRangeAcks;
392        }
393    
394        public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) {
395            if (networkConsumerIds == null) {
396                networkConsumerIds = new ArrayList<ConsumerId>();
397            }
398            networkConsumerIds.add(networkConsumerId);
399        }
400    
401        public synchronized void removeNetworkConsumerId(ConsumerId networkConsumerId) {
402            if (networkConsumerIds != null) {
403                networkConsumerIds.remove(networkConsumerId);
404                if (networkConsumerIds.isEmpty()) {
405                    networkConsumerIds=null;
406                }
407            }
408        }
409    
410        public synchronized boolean isNetworkConsumersEmpty() {
411            return networkConsumerIds == null || networkConsumerIds.isEmpty();
412        }
413    
414        public synchronized List<ConsumerId> getNetworkConsumerIds(){
415            List<ConsumerId> result = new ArrayList<ConsumerId>();
416            if (networkConsumerIds != null) {
417                result.addAll(networkConsumerIds);
418            }
419            return result;
420        }
421    
422        /**
423         * Tracks the original subscription id that causes a subscription to
424         * percolate through a network when networkTTL > 1. Tracking the original
425         * subscription allows duplicate suppression.
426         *
427         * @return array of the current subscription path
428         * @openwire:property version=4
429         */
430        public ConsumerId[] getNetworkConsumerPath() {
431            ConsumerId[] result = null;
432            if (networkConsumerIds != null) {
433                result = networkConsumerIds.toArray(new ConsumerId[0]);
434            }
435            return result;
436        }
437    
438        public void setNetworkConsumerPath(ConsumerId[] consumerPath) {
439            if (consumerPath != null) {
440                for (int i=0; i<consumerPath.length; i++) {
441                    addNetworkConsumerId(consumerPath[i]);
442                }
443            }
444        }
445    
446        public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
447            this.lastDeliveredSequenceId  = lastDeliveredSequenceId;
448        }
449    
450        public long getLastDeliveredSequenceId() {
451            return lastDeliveredSequenceId;
452        }
453    
454    }