001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.atomic.AtomicLong;
024
025import org.apache.activemq.filter.BooleanExpression;
026import org.apache.activemq.state.CommandVisitor;
027
028/**
029 * @openwire:marshaller code="5"
030 *
031 */
032public class ConsumerInfo extends BaseCommand implements TransientInitializer {
033
034    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO;
035
036    public static final byte HIGH_PRIORITY = 10;
037    public static final byte NORMAL_PRIORITY = 0;
038    public static final byte NETWORK_CONSUMER_PRIORITY = -5;
039    public static final byte LOW_PRIORITY = -10;
040
041    protected ConsumerId consumerId;
042    protected ActiveMQDestination destination;
043    protected int prefetchSize;
044    protected int maximumPendingMessageLimit;
045    protected boolean browser;
046    protected boolean dispatchAsync;
047    protected String selector;
048    protected String clientId;
049    protected String subscriptionName;
050    protected boolean noLocal;
051    protected boolean exclusive;
052    protected boolean retroactive;
053    protected byte priority;
054    protected BrokerId[] brokerPath;
055    protected boolean optimizedAcknowledge;
056    // used by the broker
057    protected transient int currentPrefetchSize;
058    // if true, the consumer will not send range
059    protected boolean noRangeAcks;
060    // acks.
061
062    protected BooleanExpression additionalPredicate;
063    protected transient boolean networkSubscription; // this subscription
064    protected transient List<ConsumerId> networkConsumerIds; // the original consumerId
065
066    // not marshalled, populated from RemoveInfo, the last message delivered, used
067    // to suppress redelivery on prefetched messages after close
068    private transient long lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET;
069    private transient Map<ActiveMQDestination, AtomicLong> assignedGroupCount = new ConcurrentHashMap<>();
070    // originated from a
071    // network connection
072
073    public ConsumerInfo() {
074    }
075
076    public ConsumerInfo(ConsumerId consumerId) {
077        this.consumerId = consumerId;
078    }
079
080    public ConsumerInfo(SessionInfo sessionInfo, long consumerId) {
081        this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId);
082    }
083
084    public ConsumerInfo copy() {
085        ConsumerInfo info = new ConsumerInfo();
086        copy(info);
087        return info;
088    }
089
090    public void copy(ConsumerInfo info) {
091        super.copy(info);
092        info.consumerId = consumerId;
093        info.destination = destination;
094        info.prefetchSize = prefetchSize;
095        info.maximumPendingMessageLimit = maximumPendingMessageLimit;
096        info.browser = browser;
097        info.dispatchAsync = dispatchAsync;
098        info.selector = selector;
099        info.clientId = clientId;
100        info.subscriptionName = subscriptionName;
101        info.noLocal = noLocal;
102        info.exclusive = exclusive;
103        info.retroactive = retroactive;
104        info.priority = priority;
105        info.brokerPath = brokerPath;
106        info.networkSubscription = networkSubscription;
107        if (networkConsumerIds != null) {
108            if (info.networkConsumerIds==null){
109                info.networkConsumerIds=new ArrayList<ConsumerId>();
110            }
111            info.networkConsumerIds.addAll(networkConsumerIds);
112        }
113    }
114
115    public boolean isDurable() {
116        return subscriptionName != null;
117    }
118
119    @Override
120    public byte getDataStructureType() {
121        return DATA_STRUCTURE_TYPE;
122    }
123
124    /**
125     * Is used to uniquely identify the consumer to the broker.
126     *
127     * @openwire:property version=1 cache=true
128     */
129    public ConsumerId getConsumerId() {
130        return consumerId;
131    }
132
133    public void setConsumerId(ConsumerId consumerId) {
134        this.consumerId = consumerId;
135    }
136
137    /**
138     * Is this consumer a queue browser?
139     *
140     * @openwire:property version=1
141     */
142    public boolean isBrowser() {
143        return browser;
144    }
145
146    public void setBrowser(boolean browser) {
147        this.browser = browser;
148    }
149
150    /**
151     * The destination that the consumer is interested in receiving messages
152     * from. This destination could be a composite destination.
153     *
154     * @openwire:property version=1 cache=true
155     */
156    public ActiveMQDestination getDestination() {
157        return destination;
158    }
159
160    public void setDestination(ActiveMQDestination destination) {
161        this.destination = destination;
162    }
163
164    /**
165     * How many messages a broker will send to the client without receiving an
166     * ack before he stops dispatching messages to the client.
167     *
168     * @openwire:property version=1
169     */
170    public int getPrefetchSize() {
171        return prefetchSize;
172    }
173
174    public void setPrefetchSize(int prefetchSize) {
175        this.prefetchSize = prefetchSize;
176        this.currentPrefetchSize = prefetchSize;
177    }
178
179    /**
180     * How many messages a broker will keep around, above the prefetch limit,
181     * for non-durable topics before starting to discard older messages.
182     *
183     * @openwire:property version=1
184     */
185    public int getMaximumPendingMessageLimit() {
186        return maximumPendingMessageLimit;
187    }
188
189    public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
190        this.maximumPendingMessageLimit = maximumPendingMessageLimit;
191    }
192
193    /**
194     * Should the broker dispatch a message to the consumer async? If he does it
195     * async, then he uses a more SEDA style of processing while if it is not
196     * done async, then he broker use a STP style of processing. STP is more
197     * appropriate in high bandwidth situations or when being used by and in vm
198     * transport.
199     *
200     * @openwire:property version=1
201     */
202    public boolean isDispatchAsync() {
203        return dispatchAsync;
204    }
205
206    public void setDispatchAsync(boolean dispatchAsync) {
207        this.dispatchAsync = dispatchAsync;
208    }
209
210    /**
211     * The JMS selector used to filter out messages that this consumer is
212     * interested in.
213     *
214     * @openwire:property version=1
215     */
216    public String getSelector() {
217        return selector;
218    }
219
220    public void setSelector(String selector) {
221        this.selector = selector;
222    }
223
224    /**
225     * Used to identify the id of a client connection.
226     *
227     * @openwire:property version=10
228     */
229    public String getClientId() {
230        return clientId;
231    }
232
233    public void setClientId(String clientId) {
234        this.clientId = clientId;
235    }
236
237    /**
238     * Used to identify the name of a durable subscription.
239     *
240     * @openwire:property version=1
241     */
242    public String getSubscriptionName() {
243        return subscriptionName;
244    }
245
246    public void setSubscriptionName(String durableSubscriptionId) {
247        this.subscriptionName = durableSubscriptionId;
248    }
249
250    /**
251     * Set noLocal to true to avoid receiving messages that were published
252     * locally on the same connection.
253     *
254     * @openwire:property version=1
255     */
256    public boolean isNoLocal() {
257        return noLocal;
258    }
259
260    public void setNoLocal(boolean noLocal) {
261        this.noLocal = noLocal;
262    }
263
264    /**
265     * An exclusive consumer locks out other consumers from being able to
266     * receive messages from the destination. If there are multiple exclusive
267     * consumers for a destination, the first one created will be the exclusive
268     * consumer of the destination.
269     *
270     * @openwire:property version=1
271     */
272    public boolean isExclusive() {
273        return exclusive;
274    }
275
276    public void setExclusive(boolean exclusive) {
277        this.exclusive = exclusive;
278    }
279
280    /**
281     * A retroactive consumer only has meaning for Topics. It allows a consumer
282     * to retroactively see messages sent prior to the consumer being created.
283     * If the consumer is not durable, it will be delivered the last message
284     * published to the topic. If the consumer is durable then it will receive
285     * all persistent messages that are still stored in persistent storage for
286     * that topic.
287     *
288     * @openwire:property version=1
289     */
290    public boolean isRetroactive() {
291        return retroactive;
292    }
293
294    public void setRetroactive(boolean retroactive) {
295        this.retroactive = retroactive;
296    }
297
298    public RemoveInfo createRemoveCommand() {
299        RemoveInfo command = new RemoveInfo(getConsumerId());
300        command.setResponseRequired(isResponseRequired());
301        return command;
302    }
303
304    /**
305     * The broker will avoid dispatching to a lower priority consumer if there
306     * are other higher priority consumers available to dispatch to. This allows
307     * letting the broker to have an affinity to higher priority consumers.
308     * Default priority is 0.
309     *
310     * @openwire:property version=1
311     */
312    public byte getPriority() {
313        return priority;
314    }
315
316    public void setPriority(byte priority) {
317        this.priority = priority;
318    }
319
320    /**
321     * The route of brokers the command has moved through.
322     *
323     * @openwire:property version=1 cache=true
324     */
325    public BrokerId[] getBrokerPath() {
326        return brokerPath;
327    }
328
329    public void setBrokerPath(BrokerId[] brokerPath) {
330        this.brokerPath = brokerPath;
331    }
332
333    /**
334     * A transient additional predicate that can be used it inject additional
335     * predicates into the selector on the fly. Handy if if say a Security
336     * Broker interceptor wants to filter out messages based on security level
337     * of the consumer.
338     *
339     * @openwire:property version=1
340     */
341    public BooleanExpression getAdditionalPredicate() {
342        return additionalPredicate;
343    }
344
345    public void setAdditionalPredicate(BooleanExpression additionalPredicate) {
346        this.additionalPredicate = additionalPredicate;
347    }
348
349    @Override
350    public Response visit(CommandVisitor visitor) throws Exception {
351        return visitor.processAddConsumer(this);
352    }
353
354    /**
355     * @openwire:property version=1
356     * @return Returns the networkSubscription.
357     */
358    public boolean isNetworkSubscription() {
359        return networkSubscription;
360    }
361
362    /**
363     * @param networkSubscription The networkSubscription to set.
364     */
365    public void setNetworkSubscription(boolean networkSubscription) {
366        this.networkSubscription = networkSubscription;
367    }
368
369    /**
370     * @openwire:property version=1
371     * @return Returns the optimizedAcknowledge.
372     */
373    public boolean isOptimizedAcknowledge() {
374        return optimizedAcknowledge;
375    }
376
377    /**
378     * @param optimizedAcknowledge The optimizedAcknowledge to set.
379     */
380    public void setOptimizedAcknowledge(boolean optimizedAcknowledge) {
381        this.optimizedAcknowledge = optimizedAcknowledge;
382    }
383
384    /**
385     * @return Returns the currentPrefetchSize.
386     */
387    public int getCurrentPrefetchSize() {
388        return currentPrefetchSize;
389    }
390
391    /**
392     * @param currentPrefetchSize The currentPrefetchSize to set.
393     */
394    public void setCurrentPrefetchSize(int currentPrefetchSize) {
395        this.currentPrefetchSize = currentPrefetchSize;
396    }
397
398    /**
399     * The broker may be able to optimize it's processing or provides better QOS
400     * if it knows the consumer will not be sending ranged acks.
401     *
402     * @return true if the consumer will not send range acks.
403     * @openwire:property version=1
404     */
405    public boolean isNoRangeAcks() {
406        return noRangeAcks;
407    }
408
409    public void setNoRangeAcks(boolean noRangeAcks) {
410        this.noRangeAcks = noRangeAcks;
411    }
412
413    public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) {
414        if (networkConsumerIds == null) {
415            networkConsumerIds = new ArrayList<ConsumerId>();
416        }
417        networkConsumerIds.add(networkConsumerId);
418    }
419
420    public synchronized void removeNetworkConsumerId(ConsumerId networkConsumerId) {
421        if (networkConsumerIds != null) {
422            networkConsumerIds.remove(networkConsumerId);
423            if (networkConsumerIds.isEmpty()) {
424                networkConsumerIds=null;
425            }
426        }
427    }
428
429    public synchronized boolean isNetworkConsumersEmpty() {
430        return networkConsumerIds == null || networkConsumerIds.isEmpty();
431    }
432
433    public synchronized List<ConsumerId> getNetworkConsumerIds(){
434        List<ConsumerId> result = new ArrayList<ConsumerId>();
435        if (networkConsumerIds != null) {
436            result.addAll(networkConsumerIds);
437        }
438        return result;
439    }
440
441    @Override
442    public int hashCode() {
443        return (consumerId == null) ? 0 : consumerId.hashCode();
444    }
445
446    @Override
447    public boolean equals(Object obj) {
448        if (this == obj) {
449            return true;
450        }
451        if (obj == null) {
452            return false;
453        }
454        if (getClass() != obj.getClass()) {
455            return false;
456        }
457
458        ConsumerInfo other = (ConsumerInfo) obj;
459
460        if (consumerId == null && other.consumerId != null) {
461            return false;
462        } else if (!consumerId.equals(other.consumerId)) {
463            return false;
464        }
465        return true;
466    }
467
468    /**
469     * Tracks the original subscription id that causes a subscription to
470     * percolate through a network when networkTTL > 1. Tracking the original
471     * subscription allows duplicate suppression.
472     *
473     * @return array of the current subscription path
474     * @openwire:property version=4
475     */
476    public ConsumerId[] getNetworkConsumerPath() {
477        ConsumerId[] result = null;
478        if (networkConsumerIds != null) {
479            result = networkConsumerIds.toArray(new ConsumerId[0]);
480        }
481        return result;
482    }
483
484    public void setNetworkConsumerPath(ConsumerId[] consumerPath) {
485        if (consumerPath != null) {
486            for (int i=0; i<consumerPath.length; i++) {
487                addNetworkConsumerId(consumerPath[i]);
488            }
489        }
490    }
491
492    public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
493        this.lastDeliveredSequenceId  = lastDeliveredSequenceId;
494    }
495
496    public long getLastDeliveredSequenceId() {
497        return lastDeliveredSequenceId;
498    }
499
500    public void incrementAssignedGroupCount(final ActiveMQDestination dest) {
501        AtomicLong value = assignedGroupCount.get(dest);
502        if (value == null) {
503            value = new AtomicLong(0);
504            assignedGroupCount.put(dest, value);
505        }
506        value.incrementAndGet();
507    }
508
509    public void clearAssignedGroupCount(final ActiveMQDestination dest) {
510        assignedGroupCount.remove(dest);
511    }
512
513    public void decrementAssignedGroupCount(final ActiveMQDestination dest) {
514        AtomicLong value = assignedGroupCount.get(dest);
515        if (value != null) {
516            value.decrementAndGet();
517        }
518    }
519
520    public long getAssignedGroupCount(final ActiveMQDestination dest) {
521        long result = 0l;
522        AtomicLong value = assignedGroupCount.get(dest);
523        if (value != null) {
524            result = value.longValue();
525        }
526        return result;
527    }
528
529    @Override
530    public void initTransients() {
531        assignedGroupCount = new ConcurrentHashMap<>();
532        lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET;
533    }
534
535}