001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.command;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    
022    import org.apache.activemq.filter.BooleanExpression;
023    import org.apache.activemq.state.CommandVisitor;
024    
025    /**
026     * @openwire:marshaller code="5"
027     *
028     */
029    public class ConsumerInfo extends BaseCommand {
030    
031        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO;
032    
033        public static final byte HIGH_PRIORITY = 10;
034        public static final byte NORMAL_PRIORITY = 0;
035        public static final byte NETWORK_CONSUMER_PRIORITY = -5;
036        public static final byte LOW_PRIORITY = -10;
037    
038        protected ConsumerId consumerId;
039        protected ActiveMQDestination destination;
040        protected int prefetchSize;
041        protected int maximumPendingMessageLimit;
042        protected boolean browser;
043        protected boolean dispatchAsync;
044        protected String selector;
045        protected String clientId;
046        protected String subscriptionName;
047        protected boolean noLocal;
048        protected boolean exclusive;
049        protected boolean retroactive;
050        protected byte priority;
051        protected BrokerId[] brokerPath;
052        protected boolean optimizedAcknowledge;
053        // used by the broker
054        protected transient int currentPrefetchSize;
055        // if true, the consumer will not send range
056        protected boolean noRangeAcks;
057        // acks.
058    
059        protected BooleanExpression additionalPredicate;
060        protected transient boolean networkSubscription; // this subscription
061        protected transient List<ConsumerId> networkConsumerIds; // the original consumerId
062    
063        // not marshalled, populated from RemoveInfo, the last message delivered, used
064        // to suppress redelivery on prefetched messages after close
065        // overload; also used at runtime to track assignment of message groups
066        private transient long lastDeliveredSequenceId;
067    
068        // originated from a
069        // network connection
070    
071        public ConsumerInfo() {
072        }
073    
074        public ConsumerInfo(ConsumerId consumerId) {
075            this.consumerId = consumerId;
076        }
077    
078        public ConsumerInfo(SessionInfo sessionInfo, long consumerId) {
079            this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId);
080        }
081    
082        public ConsumerInfo copy() {
083            ConsumerInfo info = new ConsumerInfo();
084            copy(info);
085            return info;
086        }
087    
088        public void copy(ConsumerInfo info) {
089            super.copy(info);
090            info.consumerId = consumerId;
091            info.destination = destination;
092            info.prefetchSize = prefetchSize;
093            info.maximumPendingMessageLimit = maximumPendingMessageLimit;
094            info.browser = browser;
095            info.dispatchAsync = dispatchAsync;
096            info.selector = selector;
097            info.clientId = clientId;
098            info.subscriptionName = subscriptionName;
099            info.noLocal = noLocal;
100            info.exclusive = exclusive;
101            info.retroactive = retroactive;
102            info.priority = priority;
103            info.brokerPath = brokerPath;
104            info.networkSubscription = networkSubscription;
105            if (networkConsumerIds != null) {
106                if (info.networkConsumerIds==null){
107                    info.networkConsumerIds=new ArrayList<ConsumerId>();
108                }
109                info.networkConsumerIds.addAll(networkConsumerIds);
110            }
111        }
112    
113        public boolean isDurable() {
114            return subscriptionName != null;
115        }
116    
117        public byte getDataStructureType() {
118            return DATA_STRUCTURE_TYPE;
119        }
120    
121        /**
122         * Is used to uniquely identify the consumer to the broker.
123         *
124         * @openwire:property version=1 cache=true
125         */
126        public ConsumerId getConsumerId() {
127            return consumerId;
128        }
129    
130        public void setConsumerId(ConsumerId consumerId) {
131            this.consumerId = consumerId;
132        }
133    
134        /**
135         * Is this consumer a queue browser?
136         *
137         * @openwire:property version=1
138         */
139        public boolean isBrowser() {
140            return browser;
141        }
142    
143        public void setBrowser(boolean browser) {
144            this.browser = browser;
145        }
146    
147        /**
148         * The destination that the consumer is interested in receiving messages
149         * from. This destination could be a composite destination.
150         *
151         * @openwire:property version=1 cache=true
152         */
153        public ActiveMQDestination getDestination() {
154            return destination;
155        }
156    
157        public void setDestination(ActiveMQDestination destination) {
158            this.destination = destination;
159        }
160    
161        /**
162         * How many messages a broker will send to the client without receiving an
163         * ack before he stops dispatching messages to the client.
164         *
165         * @openwire:property version=1
166         */
167        public int getPrefetchSize() {
168            return prefetchSize;
169        }
170    
171        public void setPrefetchSize(int prefetchSize) {
172            this.prefetchSize = prefetchSize;
173            this.currentPrefetchSize = prefetchSize;
174        }
175    
176        /**
177         * How many messages a broker will keep around, above the prefetch limit,
178         * for non-durable topics before starting to discard older messages.
179         *
180         * @openwire:property version=1
181         */
182        public int getMaximumPendingMessageLimit() {
183            return maximumPendingMessageLimit;
184        }
185    
186        public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
187            this.maximumPendingMessageLimit = maximumPendingMessageLimit;
188        }
189    
190        /**
191         * Should the broker dispatch a message to the consumer async? If he does it
192         * async, then he uses a more SEDA style of processing while if it is not
193         * done async, then he broker use a STP style of processing. STP is more
194         * appropriate in high bandwidth situations or when being used by and in vm
195         * transport.
196         *
197         * @openwire:property version=1
198         */
199        public boolean isDispatchAsync() {
200            return dispatchAsync;
201        }
202    
203        public void setDispatchAsync(boolean dispatchAsync) {
204            this.dispatchAsync = dispatchAsync;
205        }
206    
207        /**
208         * The JMS selector used to filter out messages that this consumer is
209         * interested in.
210         *
211         * @openwire:property version=1
212         */
213        public String getSelector() {
214            return selector;
215        }
216    
217        public void setSelector(String selector) {
218            this.selector = selector;
219        }
220    
221        /**
222         * Used to identify the id of a client connection.
223         *
224         * @openwire:property version=10
225         */
226        public String getClientId() {
227            return clientId;
228        }
229    
230        public void setClientId(String clientId) {
231            this.clientId = clientId;
232        }
233    
234        /**
235         * Used to identify the name of a durable subscription.
236         *
237         * @openwire:property version=1
238         */
239        public String getSubscriptionName() {
240            return subscriptionName;
241        }
242    
243        public void setSubscriptionName(String durableSubscriptionId) {
244            this.subscriptionName = durableSubscriptionId;
245        }
246    
247        /**
248         * Set noLocal to true to avoid receiving messages that were published
249         * locally on the same connection.
250         *
251         * @openwire:property version=1
252         */
253        public boolean isNoLocal() {
254            return noLocal;
255        }
256    
257        public void setNoLocal(boolean noLocal) {
258            this.noLocal = noLocal;
259        }
260    
261        /**
262         * An exclusive consumer locks out other consumers from being able to
263         * receive messages from the destination. If there are multiple exclusive
264         * consumers for a destination, the first one created will be the exclusive
265         * consumer of the destination.
266         *
267         * @openwire:property version=1
268         */
269        public boolean isExclusive() {
270            return exclusive;
271        }
272    
273        public void setExclusive(boolean exclusive) {
274            this.exclusive = exclusive;
275        }
276    
277        /**
278         * A retroactive consumer only has meaning for Topics. It allows a consumer
279         * to retroactively see messages sent prior to the consumer being created.
280         * If the consumer is not durable, it will be delivered the last message
281         * published to the topic. If the consumer is durable then it will receive
282         * all persistent messages that are still stored in persistent storage for
283         * that topic.
284         *
285         * @openwire:property version=1
286         */
287        public boolean isRetroactive() {
288            return retroactive;
289        }
290    
291        public void setRetroactive(boolean retroactive) {
292            this.retroactive = retroactive;
293        }
294    
295        public RemoveInfo createRemoveCommand() {
296            RemoveInfo command = new RemoveInfo(getConsumerId());
297            command.setResponseRequired(isResponseRequired());
298            return command;
299        }
300    
301        /**
302         * The broker will avoid dispatching to a lower priority consumer if there
303         * are other higher priority consumers available to dispatch to. This allows
304         * letting the broker to have an affinity to higher priority consumers.
305         * Default priority is 0.
306         *
307         * @openwire:property version=1
308         */
309        public byte getPriority() {
310            return priority;
311        }
312    
313        public void setPriority(byte priority) {
314            this.priority = priority;
315        }
316    
317        /**
318         * The route of brokers the command has moved through.
319         *
320         * @openwire:property version=1 cache=true
321         */
322        public BrokerId[] getBrokerPath() {
323            return brokerPath;
324        }
325    
326        public void setBrokerPath(BrokerId[] brokerPath) {
327            this.brokerPath = brokerPath;
328        }
329    
330        /**
331         * A transient additional predicate that can be used it inject additional
332         * predicates into the selector on the fly. Handy if if say a Security
333         * Broker interceptor wants to filter out messages based on security level
334         * of the consumer.
335         *
336         * @openwire:property version=1
337         */
338        public BooleanExpression getAdditionalPredicate() {
339            return additionalPredicate;
340        }
341    
342        public void setAdditionalPredicate(BooleanExpression additionalPredicate) {
343            this.additionalPredicate = additionalPredicate;
344        }
345    
346        public Response visit(CommandVisitor visitor) throws Exception {
347            return visitor.processAddConsumer(this);
348        }
349    
350        /**
351         * @openwire:property version=1
352         * @return Returns the networkSubscription.
353         */
354        public boolean isNetworkSubscription() {
355            return networkSubscription;
356        }
357    
358        /**
359         * @param networkSubscription The networkSubscription to set.
360         */
361        public void setNetworkSubscription(boolean networkSubscription) {
362            this.networkSubscription = networkSubscription;
363        }
364    
365        /**
366         * @openwire:property version=1
367         * @return Returns the optimizedAcknowledge.
368         */
369        public boolean isOptimizedAcknowledge() {
370            return optimizedAcknowledge;
371        }
372    
373        /**
374         * @param optimizedAcknowledge The optimizedAcknowledge to set.
375         */
376        public void setOptimizedAcknowledge(boolean optimizedAcknowledge) {
377            this.optimizedAcknowledge = optimizedAcknowledge;
378        }
379    
380        /**
381         * @return Returns the currentPrefetchSize.
382         */
383        public int getCurrentPrefetchSize() {
384            return currentPrefetchSize;
385        }
386    
387        /**
388         * @param currentPrefetchSize The currentPrefetchSize to set.
389         */
390        public void setCurrentPrefetchSize(int currentPrefetchSize) {
391            this.currentPrefetchSize = currentPrefetchSize;
392        }
393    
394        /**
395         * The broker may be able to optimize it's processing or provides better QOS
396         * if it knows the consumer will not be sending ranged acks.
397         *
398         * @return true if the consumer will not send range acks.
399         * @openwire:property version=1
400         */
401        public boolean isNoRangeAcks() {
402            return noRangeAcks;
403        }
404    
405        public void setNoRangeAcks(boolean noRangeAcks) {
406            this.noRangeAcks = noRangeAcks;
407        }
408    
409        public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) {
410            if (networkConsumerIds == null) {
411                networkConsumerIds = new ArrayList<ConsumerId>();
412            }
413            networkConsumerIds.add(networkConsumerId);
414        }
415    
416        public synchronized void removeNetworkConsumerId(ConsumerId networkConsumerId) {
417            if (networkConsumerIds != null) {
418                networkConsumerIds.remove(networkConsumerId);
419                if (networkConsumerIds.isEmpty()) {
420                    networkConsumerIds=null;
421                }
422            }
423        }
424    
425        public synchronized boolean isNetworkConsumersEmpty() {
426            return networkConsumerIds == null || networkConsumerIds.isEmpty();
427        }
428    
429        public synchronized List<ConsumerId> getNetworkConsumerIds(){
430            List<ConsumerId> result = new ArrayList<ConsumerId>();
431            if (networkConsumerIds != null) {
432                result.addAll(networkConsumerIds);
433            }
434            return result;
435        }
436    
437        /**
438         * Tracks the original subscription id that causes a subscription to
439         * percolate through a network when networkTTL > 1. Tracking the original
440         * subscription allows duplicate suppression.
441         *
442         * @return array of the current subscription path
443         * @openwire:property version=4
444         */
445        public ConsumerId[] getNetworkConsumerPath() {
446            ConsumerId[] result = null;
447            if (networkConsumerIds != null) {
448                result = networkConsumerIds.toArray(new ConsumerId[0]);
449            }
450            return result;
451        }
452    
453        public void setNetworkConsumerPath(ConsumerId[] consumerPath) {
454            if (consumerPath != null) {
455                for (int i=0; i<consumerPath.length; i++) {
456                    addNetworkConsumerId(consumerPath[i]);
457                }
458            }
459        }
460    
461        public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
462            this.lastDeliveredSequenceId  = lastDeliveredSequenceId;
463        }
464    
465        public long getLastDeliveredSequenceId() {
466            return lastDeliveredSequenceId;
467        }
468    
469    }