001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.index;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.Map.Entry;
025import java.util.concurrent.atomic.AtomicBoolean;
026
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029import org.apache.activemq.store.kahadb.disk.page.Page;
030import org.apache.activemq.store.kahadb.disk.page.PageFile;
031import org.apache.activemq.store.kahadb.disk.page.Transaction;
032import org.apache.activemq.store.kahadb.disk.util.Marshaller;
033import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
034
035/**
036 * BTree implementation
037 * 
038 * 
039 */
040public class HashIndex<Key,Value> implements Index<Key,Value> {
041
042    public static final int CLOSED_STATE = 1;
043    public static final int OPEN_STATE = 2;
044
045
046    private static final Logger LOG = LoggerFactory.getLogger(HashIndex.class);
047
048    public static final int DEFAULT_BIN_CAPACITY;
049    public static final int DEFAULT_MAXIMUM_BIN_CAPACITY;
050    public static final int DEFAULT_MINIMUM_BIN_CAPACITY;
051    public static final int DEFAULT_LOAD_FACTOR;
052
053    static {
054        DEFAULT_BIN_CAPACITY = Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
055        DEFAULT_MAXIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384"));
056        DEFAULT_MINIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("minimumCapacity", "16"));
057        DEFAULT_LOAD_FACTOR = Integer.parseInt(System.getProperty("defaultLoadFactor", "75"));
058    }
059
060    private AtomicBoolean loaded = new AtomicBoolean();
061
062
063    private int increaseThreshold;
064    private int decreaseThreshold;
065
066    // Where the bin page array starts at.
067    private int maximumBinCapacity = DEFAULT_MAXIMUM_BIN_CAPACITY;
068    private int minimumBinCapacity = DEFAULT_MINIMUM_BIN_CAPACITY;
069
070
071
072    // Once binsActive/binCapacity reaches the loadFactor, then we need to
073    // increase the capacity
074    private int loadFactor = DEFAULT_LOAD_FACTOR;
075
076    private PageFile pageFile;
077    // This page holds the index metadata.
078    private long pageId;
079
080    static class Metadata {
081        
082        private Page<Metadata> page;
083        
084        // When the index is initializing or resizing.. state changes so that
085        // on failure it can be properly recovered.
086        private int state;
087        private long binPageId;
088        private int binCapacity = DEFAULT_BIN_CAPACITY;
089        private int binsActive;
090        private int size;
091
092        
093        public void read(DataInput is) throws IOException {
094            state = is.readInt();
095            binPageId = is.readLong();
096            binCapacity = is.readInt();
097            size = is.readInt();
098            binsActive = is.readInt();
099        }
100        public void write(DataOutput os) throws IOException {
101            os.writeInt(state);
102            os.writeLong(binPageId);
103            os.writeInt(binCapacity);
104            os.writeInt(size);
105            os.writeInt(binsActive);
106        }
107        
108        static class Marshaller extends VariableMarshaller<Metadata> {
109            public Metadata readPayload(DataInput dataIn) throws IOException {
110                Metadata rc = new Metadata();
111                rc.read(dataIn);
112                return rc;
113            }
114
115            public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
116                object.write(dataOut);
117            }
118        }
119    }
120    
121    private Metadata metadata = new Metadata();
122    
123    private Metadata.Marshaller metadataMarshaller = new Metadata.Marshaller();
124    private HashBin.Marshaller<Key,Value> hashBinMarshaller = new HashBin.Marshaller<Key,Value>(this);
125    private Marshaller<Key> keyMarshaller;
126    private Marshaller<Value> valueMarshaller;
127
128    
129    /**
130     * Constructor
131     * 
132     * @param directory
133     * @param name
134     * @param indexManager
135     * @param numberOfBins
136     * @throws IOException
137     */
138    public HashIndex(PageFile pageFile, long pageId) throws IOException {
139        this.pageFile = pageFile;
140        this.pageId = pageId;
141    }
142
143    public synchronized void load(Transaction tx) throws IOException {
144        if (loaded.compareAndSet(false, true)) {
145            final Page<Metadata> metadataPage = tx.load(pageId, metadataMarshaller);
146            // Is this a brand new index?
147            if (metadataPage.getType() == Page.PAGE_FREE_TYPE) {
148                // We need to create the pages for the bins
149                Page binPage = tx.allocate(metadata.binCapacity);
150                metadata.binPageId = binPage.getPageId();
151                metadata.page = metadataPage;
152                metadataPage.set(metadata);
153                clear(tx);
154
155                // If failure happens now we can continue initializing the
156                // the hash bins...
157            } else {
158
159                metadata = metadataPage.get();
160                metadata.page = metadataPage;
161                
162                // If we did not have a clean shutdown...
163                if (metadata.state == OPEN_STATE ) {
164                    // Figure out the size and the # of bins that are
165                    // active. Yeah This loads the first page of every bin. :(
166                    // We might want to put this in the metadata page, but
167                    // then that page would be getting updated on every write.
168                    metadata.size = 0;
169                    for (int i = 0; i < metadata.binCapacity; i++) {
170                        int t = sizeOfBin(tx, i);
171                        if (t > 0) {
172                            metadata.binsActive++;
173                        }
174                        metadata.size += t;
175                    }
176                }
177            }
178
179            calcThresholds();
180
181            metadata.state = OPEN_STATE;
182            tx.store(metadataPage, metadataMarshaller, true);
183            
184            LOG.debug("HashIndex loaded. Using "+metadata.binCapacity+" bins starting at page "+metadata.binPageId);
185        }
186    }
187
188    public synchronized void unload(Transaction tx) throws IOException {
189        if (loaded.compareAndSet(true, false)) {
190            metadata.state = CLOSED_STATE;
191            tx.store(metadata.page, metadataMarshaller, true);
192        }
193    }
194
195    private int sizeOfBin(Transaction tx, int index) throws IOException {
196        return getBin(tx, index).size();
197    }
198
199    public synchronized Value get(Transaction tx, Key key) throws IOException {
200        assertLoaded();
201        return getBin(tx, key).get(key);
202    }
203    
204    public synchronized boolean containsKey(Transaction tx, Key key) throws IOException {
205        assertLoaded();
206        return getBin(tx, key).containsKey(key);
207    }
208
209    synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
210        assertLoaded();
211        HashBin<Key,Value> bin = getBin(tx, key);
212
213        int originalSize = bin.size();
214        Value result = bin.put(key,value);
215        store(tx, bin);
216
217        int newSize = bin.size();
218
219        if (newSize != originalSize) {
220            metadata.size++;
221            if (newSize == 1) {
222                metadata.binsActive++;
223            }
224        }
225
226        if (metadata.binsActive >= this.increaseThreshold) {
227            newSize = Math.min(maximumBinCapacity, metadata.binCapacity*2);
228            if(metadata.binCapacity!=newSize) {
229                resize(tx, newSize);
230            }
231        }
232        return result;
233    }
234    
235    synchronized public Value remove(Transaction tx, Key key) throws IOException {
236        assertLoaded();
237
238        HashBin<Key,Value> bin = getBin(tx, key);
239        int originalSize = bin.size();
240        Value result = bin.remove(key);
241        int newSize = bin.size();
242        
243        if (newSize != originalSize) {
244            store(tx, bin);
245
246            metadata.size--;
247            if (newSize == 0) {
248                metadata.binsActive--;
249            }
250        }
251
252        if (metadata.binsActive <= this.decreaseThreshold) {
253            newSize = Math.max(minimumBinCapacity, metadata.binCapacity/2);
254            if(metadata.binCapacity!=newSize) {
255                resize(tx, newSize);
256            }
257        }
258        return result;
259    }
260    
261
262    public synchronized void clear(Transaction tx) throws IOException {
263        assertLoaded();
264        for (int i = 0; i < metadata.binCapacity; i++) {
265            long pageId = metadata.binPageId + i;
266            clearBinAtPage(tx, pageId);
267        }
268        metadata.size = 0;
269        metadata.binsActive = 0;
270    }
271    
272    public Iterator<Entry<Key, Value>> iterator(Transaction tx) throws IOException, UnsupportedOperationException {
273        throw new UnsupportedOperationException();
274    }
275
276
277    /**
278     * @param tx
279     * @param pageId
280     * @throws IOException
281     */
282    private void clearBinAtPage(Transaction tx, long pageId) throws IOException {
283        Page<HashBin<Key,Value>> page = tx.load(pageId, null);
284        HashBin<Key, Value> bin = new HashBin<Key,Value>();
285        bin.setPage(page);
286        page.set(bin);
287        store(tx, bin);
288    }
289
290    public String toString() {
291        String str = "HashIndex" + System.identityHashCode(this) + ": " + pageFile;
292        return str;
293    }
294
295    // /////////////////////////////////////////////////////////////////
296    // Implementation Methods
297    // /////////////////////////////////////////////////////////////////
298
299    private void assertLoaded() throws IllegalStateException {
300        if( !loaded.get() ) {
301            throw new IllegalStateException("The HashIndex is not loaded");
302        }
303    }
304
305    public synchronized void store(Transaction tx, HashBin<Key,Value> bin) throws IOException {
306        tx.store(bin.getPage(), hashBinMarshaller, true);
307    }
308
309    // While resizing, the following contains the new resize data.
310    
311    private void resize(Transaction tx, final int newSize) throws IOException {
312        LOG.debug("Resizing to: "+newSize);
313        
314        int resizeCapacity = newSize;
315        long resizePageId = tx.allocate(resizeCapacity).getPageId();
316
317        // In Phase 1 we copy the data to the new bins..
318        // Initialize the bins..
319        for (int i = 0; i < resizeCapacity; i++) {
320            long pageId = resizePageId + i;
321            clearBinAtPage(tx, pageId);
322        }
323
324        metadata.binsActive = 0;
325        // Copy the data from the old bins to the new bins.
326        for (int i = 0; i < metadata.binCapacity; i++) {
327            
328            HashBin<Key,Value> bin = getBin(tx, i);
329            for (Map.Entry<Key, Value> entry : bin.getAll(tx).entrySet()) {
330                HashBin<Key,Value> resizeBin = getBin(tx, entry.getKey(), resizePageId, resizeCapacity);
331                resizeBin.put(entry.getKey(), entry.getValue());
332                store(tx, resizeBin);
333                if( resizeBin.size() == 1) {
334                    metadata.binsActive++;
335                }
336            }
337        }
338        
339        // In phase 2 we free the old bins and switch the the new bins.
340        tx.free(metadata.binPageId, metadata.binCapacity);
341        
342        metadata.binCapacity = resizeCapacity;
343        metadata.binPageId = resizePageId;
344        metadata.state = OPEN_STATE;
345        tx.store(metadata.page, metadataMarshaller, true);
346        calcThresholds();
347
348        LOG.debug("Resizing done.  New bins start at: "+metadata.binPageId);
349        resizeCapacity=0;
350        resizePageId=0;
351    }
352
353    private void calcThresholds() {
354        increaseThreshold = (metadata.binCapacity * loadFactor)/100;
355        decreaseThreshold = (metadata.binCapacity * loadFactor * loadFactor ) / 20000;
356    }
357    
358    private HashBin<Key,Value> getBin(Transaction tx, Key key) throws IOException {
359        return getBin(tx, key, metadata.binPageId, metadata.binCapacity);
360    }
361
362    private HashBin<Key,Value> getBin(Transaction tx, int i) throws IOException {
363        return getBin(tx, i, metadata.binPageId);
364    }
365    
366    private HashBin<Key,Value> getBin(Transaction tx, Key key, long basePage, int capacity) throws IOException {
367        int i = indexFor(key, capacity);
368        return getBin(tx, i, basePage);
369    }
370
371    private HashBin<Key,Value> getBin(Transaction tx, int i, long basePage) throws IOException {
372        Page<HashBin<Key, Value>> page = tx.load(basePage + i, hashBinMarshaller);
373        HashBin<Key, Value> rc = page.get();
374        rc.setPage(page);
375        return rc;
376    }
377
378    int indexFor(Key x, int length) {
379        return Math.abs(x.hashCode()%length);
380    }
381
382    // /////////////////////////////////////////////////////////////////
383    // Property Accessors
384    // /////////////////////////////////////////////////////////////////
385
386    public Marshaller<Key> getKeyMarshaller() {
387        return keyMarshaller;
388    }
389
390    /**
391     * Set the marshaller for key objects
392     * 
393     * @param marshaller
394     */
395    public synchronized void setKeyMarshaller(Marshaller<Key> marshaller) {
396        this.keyMarshaller = marshaller;
397    }
398
399    public Marshaller<Value> getValueMarshaller() {
400        return valueMarshaller;
401    }
402    /**
403     * Set the marshaller for value objects
404     * 
405     * @param marshaller
406     */
407    public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
408        this.valueMarshaller = valueMarshaller;
409    }
410    
411    /**
412     * @return number of bins in the index
413     */
414    public int getBinCapacity() {
415        return metadata.binCapacity;
416    }
417
418    /**
419     * @param binCapacity
420     */
421    public void setBinCapacity(int binCapacity) {
422        if (loaded.get() && binCapacity != metadata.binCapacity) {
423            throw new RuntimeException("Pages already loaded - can't reset bin capacity");
424        }
425        metadata.binCapacity = binCapacity;
426    }
427
428    public boolean isTransient() {
429        return false;
430    }
431
432    /**
433     * @return the loadFactor
434     */
435    public int getLoadFactor() {
436        return loadFactor;
437    }
438
439    /**
440     * @param loadFactor the loadFactor to set
441     */
442    public void setLoadFactor(int loadFactor) {
443        this.loadFactor = loadFactor;
444    }
445
446    /**
447     * @return the maximumCapacity
448     */
449    public int setMaximumBinCapacity() {
450        return maximumBinCapacity;
451    }
452
453    /**
454     * @param maximumCapacity the maximumCapacity to set
455     */
456    public void setMaximumBinCapacity(int maximumCapacity) {
457        this.maximumBinCapacity = maximumCapacity;
458    }
459
460    public synchronized int size(Transaction tx) {
461        return metadata.size;
462    }
463
464    public synchronized int getActiveBins() {
465        return metadata.binsActive;
466    }
467
468    public long getBinPageId() {
469        return metadata.binPageId;
470    }
471
472    public PageFile getPageFile() {
473        return pageFile;
474    }
475
476    public int getBinsActive() {
477        return metadata.binsActive;
478    }
479
480}