001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.region.group;
018    
019    import java.util.Map;
020    
021    import org.apache.activemq.command.ConsumerId;
022    import org.apache.activemq.memory.LRUMap;
023    
024    /**
025     * Uses hash-code buckets to associate consumers with sets of message group IDs.
026     * 
027     * 
028     */
029    public class MessageGroupHashBucket implements MessageGroupMap {
030    
031        private final int bucketCount;
032        private final ConsumerId[] consumers;
033        private final LRUMap<String,String>cache;
034    
035        public MessageGroupHashBucket(int bucketCount, int cachedSize) {
036            this.bucketCount = bucketCount;
037            this.consumers = new ConsumerId[bucketCount];
038            this.cache=new LRUMap<String,String>(cachedSize);
039        }
040    
041        public synchronized void put(String groupId, ConsumerId consumerId) {
042            int bucket = getBucketNumber(groupId);
043            consumers[bucket] = consumerId;
044            if (consumerId != null){
045              cache.put(groupId,consumerId.toString());
046            }
047        }
048    
049        public synchronized ConsumerId get(String groupId) {
050            int bucket = getBucketNumber(groupId);
051            //excersise cache
052            cache.get(groupId);
053            return consumers[bucket];
054        }
055    
056        public synchronized ConsumerId removeGroup(String groupId) {
057            int bucket = getBucketNumber(groupId);
058            ConsumerId answer = consumers[bucket];
059            consumers[bucket] = null;
060            cache.remove(groupId);
061            return answer;
062        }
063    
064        public synchronized MessageGroupSet removeConsumer(ConsumerId consumerId) {
065            MessageGroupSet answer = null;
066            for (int i = 0; i < consumers.length; i++) {
067                ConsumerId owner = consumers[i];
068                if (owner != null && owner.equals(consumerId)) {
069                    answer = createMessageGroupSet(i, answer);
070                    consumers[i] = null;
071                }
072            }
073            if (answer == null) {
074                // make an empty set
075                answer = EmptyMessageGroupSet.INSTANCE;
076            }
077            return answer;
078        }
079    
080        public synchronized void removeAll(){
081            for (int i =0; i < consumers.length; i++){
082                consumers[i] = null;
083            }
084        }
085    
086        @Override
087        public Map<String, String> getGroups() {
088            return cache;
089        }
090    
091        @Override
092        public String getType() {
093            return "bucket";
094        }
095    
096        public int getBucketCount(){
097            return bucketCount;
098        }
099    
100    
101        public String toString() {
102            int count = 0;
103            for (int i = 0; i < consumers.length; i++) {
104                if (consumers[i] != null) {
105                    count++;
106                }
107            }
108            return "active message group buckets: " + count;
109        }
110    
111        protected MessageGroupSet createMessageGroupSet(int bucketNumber, final MessageGroupSet parent) {
112            final MessageGroupSet answer = createMessageGroupSet(bucketNumber);
113            if (parent == null) {
114                return answer;
115            } else {
116                // union the two sets together
117                return new MessageGroupSet() {
118                    public boolean contains(String groupID) {
119                        return parent.contains(groupID) || answer.contains(groupID);
120                    }
121                };
122            }
123        }
124    
125        protected MessageGroupSet createMessageGroupSet(final int bucketNumber) {
126            return new MessageGroupSet() {
127                public boolean contains(String groupID) {
128                    int bucket = getBucketNumber(groupID);
129                    return bucket == bucketNumber;
130                }
131            };
132        }
133    
134        protected int getBucketNumber(String groupId) {
135            int bucket = groupId.hashCode() % bucketCount;
136            // bucket could be negative
137            if (bucket < 0) {
138                bucket *= -1;
139            }
140            return bucket;
141        }
142    }