001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.util;
018    
019    import java.io.Serializable;
020    import java.util.LinkedList;
021    
022    /**
023     * Holder for many bitArrays - used for message audit
024     * 
025     * 
026     */
027    public class BitArrayBin implements Serializable {
028    
029        private static final long serialVersionUID = 1L;
030        private LinkedList<BitArray> list;
031        private int maxNumberOfArrays;
032        private int firstIndex = -1;
033        private long lastInOrderBit=-1;
034    
035        /**
036         * Create a BitArrayBin to a certain window size (number of messages to
037         * keep)
038         * 
039         * @param windowSize
040         */
041        public BitArrayBin(int windowSize) {
042            maxNumberOfArrays = ((windowSize + 1) / BitArray.LONG_SIZE) + 1;
043            maxNumberOfArrays = Math.max(maxNumberOfArrays, 1);
044            list = new LinkedList<BitArray>();
045            for (int i = 0; i < maxNumberOfArrays; i++) {
046                list.add(null);
047            }
048        }
049    
050        /**
051         * Set a bit
052         * 
053         * @param index
054         * @param value
055         * @return true if set
056         */
057        public boolean setBit(long index, boolean value) {
058            boolean answer = false;
059            BitArray ba = getBitArray(index);
060            if (ba != null) {
061                int offset = getOffset(index);
062                if (offset >= 0) {
063                    answer = ba.set(offset, value);
064                }
065            }
066            return answer;
067        }
068        
069        /**
070         * Test if in order
071         * @param index
072         * @return true if next message is in order
073         */
074        public boolean isInOrder(long index) {
075            boolean result = false;
076            if (lastInOrderBit == -1) {
077                result = true;
078            } else {
079                result = lastInOrderBit + 1 == index;
080            }
081            lastInOrderBit = index;
082            return result;
083    
084        }
085    
086        /**
087         * Get the boolean value at the index
088         * 
089         * @param index
090         * @return true/false
091         */
092        public boolean getBit(long index) {
093            boolean answer = index >= firstIndex;
094            BitArray ba = getBitArray(index);
095            if (ba != null) {
096                int offset = getOffset(index);
097                if (offset >= 0) {
098                    answer = ba.get(offset);
099                    return answer;
100                }
101            } else {
102                // gone passed range for previous bins so assume set
103                answer = true;
104            }
105            return answer;
106        }
107    
108        /**
109         * Get the BitArray for the index
110         * 
111         * @param index
112         * @return BitArray
113         */
114        private BitArray getBitArray(long index) {
115            int bin = getBin(index);
116            BitArray answer = null;
117            if (bin >= 0) {
118                if (bin >= maxNumberOfArrays) {
119                    int overShoot = bin - maxNumberOfArrays + 1;
120                    while (overShoot > 0) {
121                        list.removeFirst();
122                        firstIndex += BitArray.LONG_SIZE;
123                        list.add(new BitArray());
124                        overShoot--;
125                    }
126                    
127                    bin = maxNumberOfArrays - 1;
128                }
129                answer = list.get(bin);
130                if (answer == null) {
131                    answer = new BitArray();
132                    list.set(bin, answer);
133                }
134            }
135            return answer;
136        }
137    
138        /**
139         * Get the index of the bin from the total index
140         * 
141         * @param index
142         * @return the index of the bin
143         */
144        private int getBin(long index) {
145            int answer = 0;
146            if (firstIndex < 0) {
147                firstIndex = (int) (index - (index % BitArray.LONG_SIZE));
148            } else if (firstIndex >= 0) {
149                answer = (int)((index - firstIndex) / BitArray.LONG_SIZE);
150            }
151            return answer;
152        }
153    
154        /**
155         * Get the offset into a bin from the total index
156         * 
157         * @param index
158         * @return the relative offset into a bin
159         */
160        private int getOffset(long index) {
161            int answer = 0;
162            if (firstIndex >= 0) {
163                answer = (int)((index - firstIndex) - (BitArray.LONG_SIZE * getBin(index)));
164            }
165            return answer;
166        }
167    
168        public long getLastSetIndex() {
169            long result = -1;
170            
171            if (firstIndex >=0) {
172                result = firstIndex;   
173                BitArray last = null;
174                for (int lastBitArrayIndex = maxNumberOfArrays -1; lastBitArrayIndex >= 0; lastBitArrayIndex--) {
175                    last = list.get(lastBitArrayIndex);
176                    if (last != null) {
177                        result += last.length() -1;
178                        result += lastBitArrayIndex * BitArray.LONG_SIZE;
179                        break;
180                    }
181                }
182            }
183            return result;
184        }
185    }