001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.command;
018
019import java.util.ArrayList;
020import java.util.HashMap;
021import java.util.List;
022import java.util.Map;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.atomic.AtomicLong;
025
026import org.apache.activemq.filter.BooleanExpression;
027import org.apache.activemq.state.CommandVisitor;
028
029/**
030 * @openwire:marshaller code="5"
031 *
032 */
033public class ConsumerInfo extends BaseCommand implements TransientInitializer {
034
035    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_INFO;
036
037    public static final byte HIGH_PRIORITY = 10;
038    public static final byte NORMAL_PRIORITY = 0;
039    public static final byte NETWORK_CONSUMER_PRIORITY = -5;
040    public static final byte LOW_PRIORITY = -10;
041
042    protected ConsumerId consumerId;
043    protected ActiveMQDestination destination;
044    protected int prefetchSize;
045    protected int maximumPendingMessageLimit;
046    protected boolean browser;
047    protected boolean dispatchAsync;
048    protected String selector;
049    protected String clientId;
050    protected String subscriptionName;
051    protected boolean noLocal;
052    protected boolean exclusive;
053    protected boolean retroactive;
054    protected byte priority;
055    protected BrokerId[] brokerPath;
056    protected boolean optimizedAcknowledge;
057    // used by the broker
058    protected transient int currentPrefetchSize;
059    // if true, the consumer will not send range
060    protected boolean noRangeAcks;
061    // acks.
062
063    protected BooleanExpression additionalPredicate;
064    protected transient boolean networkSubscription; // this subscription
065    protected transient List<ConsumerId> networkConsumerIds; // the original consumerId
066
067    // not marshalled, populated from RemoveInfo, the last message delivered, used
068    // to suppress redelivery on prefetched messages after close
069    private transient long lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET;
070    private transient Map<ActiveMQDestination, AtomicLong> assignedGroupCount = new ConcurrentHashMap<>();
071    // originated from a
072    // network connection
073
074    public ConsumerInfo() {
075    }
076
077    public ConsumerInfo(ConsumerId consumerId) {
078        this.consumerId = consumerId;
079    }
080
081    public ConsumerInfo(SessionInfo sessionInfo, long consumerId) {
082        this.consumerId = new ConsumerId(sessionInfo.getSessionId(), consumerId);
083    }
084
085    public ConsumerInfo copy() {
086        ConsumerInfo info = new ConsumerInfo();
087        copy(info);
088        return info;
089    }
090
091    public void copy(ConsumerInfo info) {
092        super.copy(info);
093        info.consumerId = consumerId;
094        info.destination = destination;
095        info.prefetchSize = prefetchSize;
096        info.maximumPendingMessageLimit = maximumPendingMessageLimit;
097        info.browser = browser;
098        info.dispatchAsync = dispatchAsync;
099        info.selector = selector;
100        info.clientId = clientId;
101        info.subscriptionName = subscriptionName;
102        info.noLocal = noLocal;
103        info.exclusive = exclusive;
104        info.retroactive = retroactive;
105        info.priority = priority;
106        info.brokerPath = brokerPath;
107        info.networkSubscription = networkSubscription;
108        if (networkConsumerIds != null) {
109            if (info.networkConsumerIds==null){
110                info.networkConsumerIds=new ArrayList<ConsumerId>();
111            }
112            info.networkConsumerIds.addAll(networkConsumerIds);
113        }
114    }
115
116    public boolean isDurable() {
117        return subscriptionName != null;
118    }
119
120    @Override
121    public byte getDataStructureType() {
122        return DATA_STRUCTURE_TYPE;
123    }
124
125    /**
126     * Is used to uniquely identify the consumer to the broker.
127     *
128     * @openwire:property version=1 cache=true
129     */
130    public ConsumerId getConsumerId() {
131        return consumerId;
132    }
133
134    public void setConsumerId(ConsumerId consumerId) {
135        this.consumerId = consumerId;
136    }
137
138    /**
139     * Is this consumer a queue browser?
140     *
141     * @openwire:property version=1
142     */
143    public boolean isBrowser() {
144        return browser;
145    }
146
147    public void setBrowser(boolean browser) {
148        this.browser = browser;
149    }
150
151    /**
152     * The destination that the consumer is interested in receiving messages
153     * from. This destination could be a composite destination.
154     *
155     * @openwire:property version=1 cache=true
156     */
157    public ActiveMQDestination getDestination() {
158        return destination;
159    }
160
161    public void setDestination(ActiveMQDestination destination) {
162        this.destination = destination;
163    }
164
165    /**
166     * How many messages a broker will send to the client without receiving an
167     * ack before he stops dispatching messages to the client.
168     *
169     * @openwire:property version=1
170     */
171    public int getPrefetchSize() {
172        return prefetchSize;
173    }
174
175    public void setPrefetchSize(int prefetchSize) {
176        this.prefetchSize = prefetchSize;
177        this.currentPrefetchSize = prefetchSize;
178    }
179
180    /**
181     * How many messages a broker will keep around, above the prefetch limit,
182     * for non-durable topics before starting to discard older messages.
183     *
184     * @openwire:property version=1
185     */
186    public int getMaximumPendingMessageLimit() {
187        return maximumPendingMessageLimit;
188    }
189
190    public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
191        this.maximumPendingMessageLimit = maximumPendingMessageLimit;
192    }
193
194    /**
195     * Should the broker dispatch a message to the consumer async? If he does it
196     * async, then he uses a more SEDA style of processing while if it is not
197     * done async, then he broker use a STP style of processing. STP is more
198     * appropriate in high bandwidth situations or when being used by and in vm
199     * transport.
200     *
201     * @openwire:property version=1
202     */
203    public boolean isDispatchAsync() {
204        return dispatchAsync;
205    }
206
207    public void setDispatchAsync(boolean dispatchAsync) {
208        this.dispatchAsync = dispatchAsync;
209    }
210
211    /**
212     * The JMS selector used to filter out messages that this consumer is
213     * interested in.
214     *
215     * @openwire:property version=1
216     */
217    public String getSelector() {
218        return selector;
219    }
220
221    public void setSelector(String selector) {
222        this.selector = selector;
223    }
224
225    /**
226     * Used to identify the id of a client connection.
227     *
228     * @openwire:property version=10
229     */
230    public String getClientId() {
231        return clientId;
232    }
233
234    public void setClientId(String clientId) {
235        this.clientId = clientId;
236    }
237
238    /**
239     * Used to identify the name of a durable subscription.
240     *
241     * @openwire:property version=1
242     */
243    public String getSubscriptionName() {
244        return subscriptionName;
245    }
246
247    public void setSubscriptionName(String durableSubscriptionId) {
248        this.subscriptionName = durableSubscriptionId;
249    }
250
251    /**
252     * Set noLocal to true to avoid receiving messages that were published
253     * locally on the same connection.
254     *
255     * @openwire:property version=1
256     */
257    public boolean isNoLocal() {
258        return noLocal;
259    }
260
261    public void setNoLocal(boolean noLocal) {
262        this.noLocal = noLocal;
263    }
264
265    /**
266     * An exclusive consumer locks out other consumers from being able to
267     * receive messages from the destination. If there are multiple exclusive
268     * consumers for a destination, the first one created will be the exclusive
269     * consumer of the destination.
270     *
271     * @openwire:property version=1
272     */
273    public boolean isExclusive() {
274        return exclusive;
275    }
276
277    public void setExclusive(boolean exclusive) {
278        this.exclusive = exclusive;
279    }
280
281    /**
282     * A retroactive consumer only has meaning for Topics. It allows a consumer
283     * to retroactively see messages sent prior to the consumer being created.
284     * If the consumer is not durable, it will be delivered the last message
285     * published to the topic. If the consumer is durable then it will receive
286     * all persistent messages that are still stored in persistent storage for
287     * that topic.
288     *
289     * @openwire:property version=1
290     */
291    public boolean isRetroactive() {
292        return retroactive;
293    }
294
295    public void setRetroactive(boolean retroactive) {
296        this.retroactive = retroactive;
297    }
298
299    public RemoveInfo createRemoveCommand() {
300        RemoveInfo command = new RemoveInfo(getConsumerId());
301        command.setResponseRequired(isResponseRequired());
302        return command;
303    }
304
305    /**
306     * The broker will avoid dispatching to a lower priority consumer if there
307     * are other higher priority consumers available to dispatch to. This allows
308     * letting the broker to have an affinity to higher priority consumers.
309     * Default priority is 0.
310     *
311     * @openwire:property version=1
312     */
313    public byte getPriority() {
314        return priority;
315    }
316
317    public void setPriority(byte priority) {
318        this.priority = priority;
319    }
320
321    /**
322     * The route of brokers the command has moved through.
323     *
324     * @openwire:property version=1 cache=true
325     */
326    public BrokerId[] getBrokerPath() {
327        return brokerPath;
328    }
329
330    public void setBrokerPath(BrokerId[] brokerPath) {
331        this.brokerPath = brokerPath;
332    }
333
334    /**
335     * A transient additional predicate that can be used it inject additional
336     * predicates into the selector on the fly. Handy if if say a Security
337     * Broker interceptor wants to filter out messages based on security level
338     * of the consumer.
339     *
340     * @openwire:property version=1
341     */
342    public BooleanExpression getAdditionalPredicate() {
343        return additionalPredicate;
344    }
345
346    public void setAdditionalPredicate(BooleanExpression additionalPredicate) {
347        this.additionalPredicate = additionalPredicate;
348    }
349
350    @Override
351    public Response visit(CommandVisitor visitor) throws Exception {
352        return visitor.processAddConsumer(this);
353    }
354
355    /**
356     * @openwire:property version=1
357     * @return Returns the networkSubscription.
358     */
359    public boolean isNetworkSubscription() {
360        return networkSubscription;
361    }
362
363    /**
364     * @param networkSubscription The networkSubscription to set.
365     */
366    public void setNetworkSubscription(boolean networkSubscription) {
367        this.networkSubscription = networkSubscription;
368    }
369
370    /**
371     * @openwire:property version=1
372     * @return Returns the optimizedAcknowledge.
373     */
374    public boolean isOptimizedAcknowledge() {
375        return optimizedAcknowledge;
376    }
377
378    /**
379     * @param optimizedAcknowledge The optimizedAcknowledge to set.
380     */
381    public void setOptimizedAcknowledge(boolean optimizedAcknowledge) {
382        this.optimizedAcknowledge = optimizedAcknowledge;
383    }
384
385    /**
386     * @return Returns the currentPrefetchSize.
387     */
388    public int getCurrentPrefetchSize() {
389        return currentPrefetchSize;
390    }
391
392    /**
393     * @param currentPrefetchSize The currentPrefetchSize to set.
394     */
395    public void setCurrentPrefetchSize(int currentPrefetchSize) {
396        this.currentPrefetchSize = currentPrefetchSize;
397    }
398
399    /**
400     * The broker may be able to optimize it's processing or provides better QOS
401     * if it knows the consumer will not be sending ranged acks.
402     *
403     * @return true if the consumer will not send range acks.
404     * @openwire:property version=1
405     */
406    public boolean isNoRangeAcks() {
407        return noRangeAcks;
408    }
409
410    public void setNoRangeAcks(boolean noRangeAcks) {
411        this.noRangeAcks = noRangeAcks;
412    }
413
414    public synchronized void addNetworkConsumerId(ConsumerId networkConsumerId) {
415        if (networkConsumerIds == null) {
416            networkConsumerIds = new ArrayList<ConsumerId>();
417        }
418        networkConsumerIds.add(networkConsumerId);
419    }
420
421    public synchronized void removeNetworkConsumerId(ConsumerId networkConsumerId) {
422        if (networkConsumerIds != null) {
423            networkConsumerIds.remove(networkConsumerId);
424            if (networkConsumerIds.isEmpty()) {
425                networkConsumerIds=null;
426            }
427        }
428    }
429
430    public synchronized boolean isNetworkConsumersEmpty() {
431        return networkConsumerIds == null || networkConsumerIds.isEmpty();
432    }
433
434    public synchronized List<ConsumerId> getNetworkConsumerIds(){
435        List<ConsumerId> result = new ArrayList<ConsumerId>();
436        if (networkConsumerIds != null) {
437            result.addAll(networkConsumerIds);
438        }
439        return result;
440    }
441
442    @Override
443    public int hashCode() {
444        return (consumerId == null) ? 0 : consumerId.hashCode();
445    }
446
447    @Override
448    public boolean equals(Object obj) {
449        if (this == obj) {
450            return true;
451        }
452        if (obj == null) {
453            return false;
454        }
455        if (getClass() != obj.getClass()) {
456            return false;
457        }
458
459        ConsumerInfo other = (ConsumerInfo) obj;
460
461        if (consumerId == null && other.consumerId != null) {
462            return false;
463        } else if (!consumerId.equals(other.consumerId)) {
464            return false;
465        }
466        return true;
467    }
468
469    /**
470     * Tracks the original subscription id that causes a subscription to
471     * percolate through a network when networkTTL > 1. Tracking the original
472     * subscription allows duplicate suppression.
473     *
474     * @return array of the current subscription path
475     * @openwire:property version=4
476     */
477    public ConsumerId[] getNetworkConsumerPath() {
478        ConsumerId[] result = null;
479        if (networkConsumerIds != null) {
480            result = networkConsumerIds.toArray(new ConsumerId[0]);
481        }
482        return result;
483    }
484
485    public void setNetworkConsumerPath(ConsumerId[] consumerPath) {
486        if (consumerPath != null) {
487            for (int i=0; i<consumerPath.length; i++) {
488                addNetworkConsumerId(consumerPath[i]);
489            }
490        }
491    }
492
493    public void setLastDeliveredSequenceId(long lastDeliveredSequenceId) {
494        this.lastDeliveredSequenceId  = lastDeliveredSequenceId;
495    }
496
497    public long getLastDeliveredSequenceId() {
498        return lastDeliveredSequenceId;
499    }
500
501    public void incrementAssignedGroupCount(final ActiveMQDestination dest) {
502        AtomicLong value = assignedGroupCount.get(dest);
503        if (value == null) {
504            value = new AtomicLong(0);
505            assignedGroupCount.put(dest, value);
506        }
507        value.incrementAndGet();
508    }
509
510    public void clearAssignedGroupCount(final ActiveMQDestination dest) {
511        assignedGroupCount.remove(dest);
512    }
513
514    public void decrementAssignedGroupCount(final ActiveMQDestination dest) {
515        AtomicLong value = assignedGroupCount.get(dest);
516        if (value != null) {
517            value.decrementAndGet();
518        }
519    }
520
521    public long getAssignedGroupCount(final ActiveMQDestination dest) {
522        long result = 0l;
523        AtomicLong value = assignedGroupCount.get(dest);
524        if (value != null) {
525            result = value.longValue();
526        }
527        return result;
528    }
529
530    @Override
531    public void initTransients() {
532        assignedGroupCount = new ConcurrentHashMap<>();
533        lastDeliveredSequenceId = RemoveInfo.LAST_DELIVERED_UNSET;
534    }
535
536    @Override
537    public String toString() {
538        HashMap<String, Object> overrideFields = new HashMap<String, Object>();
539        overrideFields.put("networkConsumerIds", networkConsumerIds);
540        return super.toString(overrideFields);
541    }
542
543}