001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.disk.index;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.IOException;
022    import java.util.Iterator;
023    import java.util.Map;
024    import java.util.Map.Entry;
025    import java.util.concurrent.atomic.AtomicBoolean;
026    
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    import org.apache.activemq.store.kahadb.disk.page.Page;
030    import org.apache.activemq.store.kahadb.disk.page.PageFile;
031    import org.apache.activemq.store.kahadb.disk.page.Transaction;
032    import org.apache.activemq.store.kahadb.disk.util.Marshaller;
033    import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
034    
035    /**
036     * BTree implementation
037     * 
038     * 
039     */
040    public class HashIndex<Key,Value> implements Index<Key,Value> {
041    
042        public static final int CLOSED_STATE = 1;
043        public static final int OPEN_STATE = 2;
044    
045    
046        private static final Logger LOG = LoggerFactory.getLogger(HashIndex.class);
047    
048        public static final int DEFAULT_BIN_CAPACITY;
049        public static final int DEFAULT_MAXIMUM_BIN_CAPACITY;
050        public static final int DEFAULT_MINIMUM_BIN_CAPACITY;
051        public static final int DEFAULT_LOAD_FACTOR;
052    
053        static {
054            DEFAULT_BIN_CAPACITY = Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
055            DEFAULT_MAXIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384"));
056            DEFAULT_MINIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("minimumCapacity", "16"));
057            DEFAULT_LOAD_FACTOR = Integer.parseInt(System.getProperty("defaultLoadFactor", "75"));
058        }
059    
060        private AtomicBoolean loaded = new AtomicBoolean();
061    
062    
063        private int increaseThreshold;
064        private int decreaseThreshold;
065    
066        // Where the bin page array starts at.
067        private int maximumBinCapacity = DEFAULT_MAXIMUM_BIN_CAPACITY;
068        private int minimumBinCapacity = DEFAULT_MINIMUM_BIN_CAPACITY;
069    
070    
071    
072        // Once binsActive/binCapacity reaches the loadFactor, then we need to
073        // increase the capacity
074        private int loadFactor = DEFAULT_LOAD_FACTOR;
075    
076        private PageFile pageFile;
077        // This page holds the index metadata.
078        private long pageId;
079    
080        static class Metadata {
081            
082            private Page<Metadata> page;
083            
084            // When the index is initializing or resizing.. state changes so that
085            // on failure it can be properly recovered.
086            private int state;
087            private long binPageId;
088            private int binCapacity = DEFAULT_BIN_CAPACITY;
089            private int binsActive;
090            private int size;
091    
092            
093            public void read(DataInput is) throws IOException {
094                state = is.readInt();
095                binPageId = is.readLong();
096                binCapacity = is.readInt();
097                size = is.readInt();
098                binsActive = is.readInt();
099            }
100            public void write(DataOutput os) throws IOException {
101                os.writeInt(state);
102                os.writeLong(binPageId);
103                os.writeInt(binCapacity);
104                os.writeInt(size);
105                os.writeInt(binsActive);
106            }
107            
108            static class Marshaller extends VariableMarshaller<Metadata> {
109                public Metadata readPayload(DataInput dataIn) throws IOException {
110                    Metadata rc = new Metadata();
111                    rc.read(dataIn);
112                    return rc;
113                }
114    
115                public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
116                    object.write(dataOut);
117                }
118            }
119        }
120        
121        private Metadata metadata = new Metadata();
122        
123        private Metadata.Marshaller metadataMarshaller = new Metadata.Marshaller();
124        private HashBin.Marshaller<Key,Value> hashBinMarshaller = new HashBin.Marshaller<Key,Value>(this);
125        private Marshaller<Key> keyMarshaller;
126        private Marshaller<Value> valueMarshaller;
127    
128        
129        /**
130         * Constructor
131         * 
132         * @param directory
133         * @param name
134         * @param indexManager
135         * @param numberOfBins
136         * @throws IOException
137         */
138        public HashIndex(PageFile pageFile, long pageId) throws IOException {
139            this.pageFile = pageFile;
140            this.pageId = pageId;
141        }
142    
143        public synchronized void load(Transaction tx) throws IOException {
144            if (loaded.compareAndSet(false, true)) {
145                final Page<Metadata> metadataPage = tx.load(pageId, metadataMarshaller);
146                // Is this a brand new index?
147                if (metadataPage.getType() == Page.PAGE_FREE_TYPE) {
148                    // We need to create the pages for the bins
149                    Page binPage = tx.allocate(metadata.binCapacity);
150                    metadata.binPageId = binPage.getPageId();
151                    metadata.page = metadataPage;
152                    metadataPage.set(metadata);
153                    clear(tx);
154    
155                    // If failure happens now we can continue initializing the
156                    // the hash bins...
157                } else {
158    
159                    metadata = metadataPage.get();
160                    metadata.page = metadataPage;
161                    
162                    // If we did not have a clean shutdown...
163                    if (metadata.state == OPEN_STATE ) {
164                        // Figure out the size and the # of bins that are
165                        // active. Yeah This loads the first page of every bin. :(
166                        // We might want to put this in the metadata page, but
167                        // then that page would be getting updated on every write.
168                        metadata.size = 0;
169                        for (int i = 0; i < metadata.binCapacity; i++) {
170                            int t = sizeOfBin(tx, i);
171                            if (t > 0) {
172                                metadata.binsActive++;
173                            }
174                            metadata.size += t;
175                        }
176                    }
177                }
178    
179                calcThresholds();
180    
181                metadata.state = OPEN_STATE;
182                tx.store(metadataPage, metadataMarshaller, true);
183                
184                LOG.debug("HashIndex loaded. Using "+metadata.binCapacity+" bins starting at page "+metadata.binPageId);
185            }
186        }
187    
188        public synchronized void unload(Transaction tx) throws IOException {
189            if (loaded.compareAndSet(true, false)) {
190                metadata.state = CLOSED_STATE;
191                tx.store(metadata.page, metadataMarshaller, true);
192            }
193        }
194    
195        private int sizeOfBin(Transaction tx, int index) throws IOException {
196            return getBin(tx, index).size();
197        }
198    
199        public synchronized Value get(Transaction tx, Key key) throws IOException {
200            assertLoaded();
201            return getBin(tx, key).get(key);
202        }
203        
204        public synchronized boolean containsKey(Transaction tx, Key key) throws IOException {
205            assertLoaded();
206            return getBin(tx, key).containsKey(key);
207        }
208    
209        synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
210            assertLoaded();
211            HashBin<Key,Value> bin = getBin(tx, key);
212    
213            int originalSize = bin.size();
214            Value result = bin.put(key,value);
215            store(tx, bin);
216    
217            int newSize = bin.size();
218    
219            if (newSize != originalSize) {
220                metadata.size++;
221                if (newSize == 1) {
222                    metadata.binsActive++;
223                }
224            }
225    
226            if (metadata.binsActive >= this.increaseThreshold) {
227                newSize = Math.min(maximumBinCapacity, metadata.binCapacity*2);
228                if(metadata.binCapacity!=newSize) {
229                    resize(tx, newSize);
230                }
231            }
232            return result;
233        }
234        
235        synchronized public Value remove(Transaction tx, Key key) throws IOException {
236            assertLoaded();
237    
238            HashBin<Key,Value> bin = getBin(tx, key);
239            int originalSize = bin.size();
240            Value result = bin.remove(key);
241            int newSize = bin.size();
242            
243            if (newSize != originalSize) {
244                store(tx, bin);
245    
246                metadata.size--;
247                if (newSize == 0) {
248                    metadata.binsActive--;
249                }
250            }
251    
252            if (metadata.binsActive <= this.decreaseThreshold) {
253                newSize = Math.max(minimumBinCapacity, metadata.binCapacity/2);
254                if(metadata.binCapacity!=newSize) {
255                    resize(tx, newSize);
256                }
257            }
258            return result;
259        }
260        
261    
262        public synchronized void clear(Transaction tx) throws IOException {
263            assertLoaded();
264            for (int i = 0; i < metadata.binCapacity; i++) {
265                long pageId = metadata.binPageId + i;
266                clearBinAtPage(tx, pageId);
267            }
268            metadata.size = 0;
269            metadata.binsActive = 0;
270        }
271        
272        public Iterator<Entry<Key, Value>> iterator(Transaction tx) throws IOException, UnsupportedOperationException {
273            throw new UnsupportedOperationException();
274        }
275    
276    
277        /**
278         * @param tx
279         * @param pageId
280         * @throws IOException
281         */
282        private void clearBinAtPage(Transaction tx, long pageId) throws IOException {
283            Page<HashBin<Key,Value>> page = tx.load(pageId, null);
284            HashBin<Key, Value> bin = new HashBin<Key,Value>();
285            bin.setPage(page);
286            page.set(bin);
287            store(tx, bin);
288        }
289    
290        public String toString() {
291            String str = "HashIndex" + System.identityHashCode(this) + ": " + pageFile;
292            return str;
293        }
294    
295        // /////////////////////////////////////////////////////////////////
296        // Implementation Methods
297        // /////////////////////////////////////////////////////////////////
298    
299        private void assertLoaded() throws IllegalStateException {
300            if( !loaded.get() ) {
301                throw new IllegalStateException("The HashIndex is not loaded");
302            }
303        }
304    
305        public synchronized void store(Transaction tx, HashBin<Key,Value> bin) throws IOException {
306            tx.store(bin.getPage(), hashBinMarshaller, true);
307        }
308    
309        // While resizing, the following contains the new resize data.
310        
311        private void resize(Transaction tx, final int newSize) throws IOException {
312            LOG.debug("Resizing to: "+newSize);
313            
314            int resizeCapacity = newSize;
315            long resizePageId = tx.allocate(resizeCapacity).getPageId();
316    
317            // In Phase 1 we copy the data to the new bins..
318            // Initialize the bins..
319            for (int i = 0; i < resizeCapacity; i++) {
320                long pageId = resizePageId + i;
321                clearBinAtPage(tx, pageId);
322            }
323    
324            metadata.binsActive = 0;
325            // Copy the data from the old bins to the new bins.
326            for (int i = 0; i < metadata.binCapacity; i++) {
327                
328                HashBin<Key,Value> bin = getBin(tx, i);
329                for (Map.Entry<Key, Value> entry : bin.getAll(tx).entrySet()) {
330                    HashBin<Key,Value> resizeBin = getBin(tx, entry.getKey(), resizePageId, resizeCapacity);
331                    resizeBin.put(entry.getKey(), entry.getValue());
332                    store(tx, resizeBin);
333                    if( resizeBin.size() == 1) {
334                        metadata.binsActive++;
335                    }
336                }
337            }
338            
339            // In phase 2 we free the old bins and switch the the new bins.
340            tx.free(metadata.binPageId, metadata.binCapacity);
341            
342            metadata.binCapacity = resizeCapacity;
343            metadata.binPageId = resizePageId;
344            metadata.state = OPEN_STATE;
345            tx.store(metadata.page, metadataMarshaller, true);
346            calcThresholds();
347    
348            LOG.debug("Resizing done.  New bins start at: "+metadata.binPageId);
349            resizeCapacity=0;
350            resizePageId=0;
351        }
352    
353        private void calcThresholds() {
354            increaseThreshold = (metadata.binCapacity * loadFactor)/100;
355            decreaseThreshold = (metadata.binCapacity * loadFactor * loadFactor ) / 20000;
356        }
357        
358        private HashBin<Key,Value> getBin(Transaction tx, Key key) throws IOException {
359            return getBin(tx, key, metadata.binPageId, metadata.binCapacity);
360        }
361    
362        private HashBin<Key,Value> getBin(Transaction tx, int i) throws IOException {
363            return getBin(tx, i, metadata.binPageId);
364        }
365        
366        private HashBin<Key,Value> getBin(Transaction tx, Key key, long basePage, int capacity) throws IOException {
367            int i = indexFor(key, capacity);
368            return getBin(tx, i, basePage);
369        }
370    
371        private HashBin<Key,Value> getBin(Transaction tx, int i, long basePage) throws IOException {
372            Page<HashBin<Key, Value>> page = tx.load(basePage + i, hashBinMarshaller);
373            HashBin<Key, Value> rc = page.get();
374            rc.setPage(page);
375            return rc;
376        }
377    
378        int indexFor(Key x, int length) {
379            return Math.abs(x.hashCode()%length);
380        }
381    
382        // /////////////////////////////////////////////////////////////////
383        // Property Accessors
384        // /////////////////////////////////////////////////////////////////
385    
386        public Marshaller<Key> getKeyMarshaller() {
387            return keyMarshaller;
388        }
389    
390        /**
391         * Set the marshaller for key objects
392         * 
393         * @param marshaller
394         */
395        public synchronized void setKeyMarshaller(Marshaller<Key> marshaller) {
396            this.keyMarshaller = marshaller;
397        }
398    
399        public Marshaller<Value> getValueMarshaller() {
400            return valueMarshaller;
401        }
402        /**
403         * Set the marshaller for value objects
404         * 
405         * @param marshaller
406         */
407        public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
408            this.valueMarshaller = valueMarshaller;
409        }
410        
411        /**
412         * @return number of bins in the index
413         */
414        public int getBinCapacity() {
415            return metadata.binCapacity;
416        }
417    
418        /**
419         * @param binCapacity
420         */
421        public void setBinCapacity(int binCapacity) {
422            if (loaded.get() && binCapacity != metadata.binCapacity) {
423                throw new RuntimeException("Pages already loaded - can't reset bin capacity");
424            }
425            metadata.binCapacity = binCapacity;
426        }
427    
428        public boolean isTransient() {
429            return false;
430        }
431    
432        /**
433         * @return the loadFactor
434         */
435        public int getLoadFactor() {
436            return loadFactor;
437        }
438    
439        /**
440         * @param loadFactor the loadFactor to set
441         */
442        public void setLoadFactor(int loadFactor) {
443            this.loadFactor = loadFactor;
444        }
445    
446        /**
447         * @return the maximumCapacity
448         */
449        public int setMaximumBinCapacity() {
450            return maximumBinCapacity;
451        }
452    
453        /**
454         * @param maximumCapacity the maximumCapacity to set
455         */
456        public void setMaximumBinCapacity(int maximumCapacity) {
457            this.maximumBinCapacity = maximumCapacity;
458        }
459    
460        public synchronized int size(Transaction tx) {
461            return metadata.size;
462        }
463    
464        public synchronized int getActiveBins() {
465            return metadata.binsActive;
466        }
467    
468        public long getBinPageId() {
469            return metadata.binPageId;
470        }
471    
472        public PageFile getPageFile() {
473            return pageFile;
474        }
475    
476        public int getBinsActive() {
477            return metadata.binsActive;
478        }
479    
480    }