001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.kaha.impl.index.hash;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.io.RandomAccessFile;
022    import java.util.LinkedList;
023    import java.util.concurrent.atomic.AtomicBoolean;
024    
025    import org.apache.activemq.kaha.Marshaller;
026    import org.apache.activemq.kaha.StoreEntry;
027    import org.apache.activemq.kaha.impl.index.Index;
028    import org.apache.activemq.kaha.impl.index.IndexManager;
029    import org.apache.activemq.util.DataByteArrayInputStream;
030    import org.apache.activemq.util.DataByteArrayOutputStream;
031    import org.apache.activemq.util.IOHelper;
032    import org.apache.activemq.util.LRUCache;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    /**
037     * BTree implementation
038     * 
039     * 
040     */
041    public class HashIndex implements Index, HashIndexMBean {
042        public static final int DEFAULT_PAGE_SIZE;
043        public static final int DEFAULT_KEY_SIZE;
044        public static final int DEFAULT_BIN_SIZE;
045        public static final int MAXIMUM_CAPACITY;
046        public static final int DEFAULT_LOAD_FACTOR;
047        private static final int LOW_WATER_MARK=1024*16;
048        private static final String NAME_PREFIX = "hash-index-";
049        private static final Logger LOG = LoggerFactory.getLogger(HashIndex.class);
050        private final String name;
051        private File directory;
052        private File file;
053        private RandomAccessFile indexFile;
054        private IndexManager indexManager;
055        private int pageSize = DEFAULT_PAGE_SIZE;
056        private int keySize = DEFAULT_KEY_SIZE;
057        private int numberOfBins = DEFAULT_BIN_SIZE;
058        private int keysPerPage = this.pageSize /this.keySize;
059        private DataByteArrayInputStream dataIn;
060        private DataByteArrayOutputStream dataOut;
061        private byte[] readBuffer;
062        private HashBin[] bins;
063        private Marshaller keyMarshaller;
064        private long length;
065        private LinkedList<HashPage> freeList = new LinkedList<HashPage>();
066        private AtomicBoolean loaded = new AtomicBoolean();
067        private LRUCache<Long, HashPage> pageCache;
068        private boolean enablePageCaching=false;//this is off by default - see AMQ-1667
069        private int pageCacheSize = 10;
070        private int size;
071        private int highestSize=0;
072        private int activeBins;
073        private int threshold;
074        private int maximumCapacity=MAXIMUM_CAPACITY;
075        private int loadFactor=DEFAULT_LOAD_FACTOR;
076        
077        
078        /**
079         * Constructor
080         * 
081         * @param directory
082         * @param name
083         * @param indexManager
084         * @throws IOException
085         */
086        public HashIndex(File directory, String name, IndexManager indexManager) throws IOException {
087            this.directory = directory;
088            this.name = name;
089            this.indexManager = indexManager;
090            openIndexFile();
091            pageCache = new LRUCache<Long, HashPage>(pageCacheSize, pageCacheSize, 0.75f, true);
092        }
093    
094        /**
095         * Set the marshaller for key objects
096         * 
097         * @param marshaller
098         */
099        public synchronized void setKeyMarshaller(Marshaller marshaller) {
100            this.keyMarshaller = marshaller;
101        }
102    
103        /**
104         * @return the keySize
105         */
106        public synchronized int getKeySize() {
107            return this.keySize;
108        }
109    
110        /**
111         * @param keySize the keySize to set
112         */
113        public synchronized void setKeySize(int keySize) {
114            this.keySize = keySize;
115            if (loaded.get()) {
116                throw new RuntimeException("Pages already loaded - can't reset key size");
117            }
118        }
119    
120        /**
121         * @return the pageSize
122         */
123        public synchronized int getPageSize() {
124            return this.pageSize;
125        }
126    
127        /**
128         * @param pageSize the pageSize to set
129         */
130        public synchronized void setPageSize(int pageSize) {
131            if (loaded.get() && pageSize != this.pageSize) {
132                throw new RuntimeException("Pages already loaded - can't reset page size");
133            }
134            this.pageSize = pageSize;
135        }
136        
137        /**
138         * @return number of bins
139         */
140        public int getNumberOfBins() {
141            return this.numberOfBins;
142        }
143    
144        /**
145         * @param numberOfBins
146         */
147        public void setNumberOfBins(int numberOfBins) {
148            if (loaded.get() && numberOfBins != this.numberOfBins) {
149                throw new RuntimeException("Pages already loaded - can't reset bin size");
150            }
151            this.numberOfBins = numberOfBins;
152        }
153    
154        /**
155         * @return the enablePageCaching
156         */
157        public synchronized boolean isEnablePageCaching() {
158            return this.enablePageCaching;
159        }
160    
161        /**
162         * @param enablePageCaching the enablePageCaching to set
163         */
164        public synchronized void setEnablePageCaching(boolean enablePageCaching) {
165            this.enablePageCaching = enablePageCaching;
166        }
167    
168        /**
169         * @return the pageCacheSize
170         */
171        public synchronized int getPageCacheSize() {
172            return this.pageCacheSize;
173        }
174    
175        /**
176         * @param pageCacheSize the pageCacheSize to set
177         */
178        public synchronized void setPageCacheSize(int pageCacheSize) {
179            this.pageCacheSize = pageCacheSize;
180            pageCache.setMaxCacheSize(pageCacheSize);
181        }
182    
183        public synchronized boolean isTransient() {
184            return false;
185        }
186        
187        /**
188         * @return the threshold
189         */
190        public int getThreshold() {
191            return threshold;
192        }
193    
194        /**
195         * @param threshold the threshold to set
196         */
197        public void setThreshold(int threshold) {
198            this.threshold = threshold;
199        }
200    
201        /**
202         * @return the loadFactor
203         */
204        public int getLoadFactor() {
205            return loadFactor;
206        }
207    
208        /**
209         * @param loadFactor the loadFactor to set
210         */
211        public void setLoadFactor(int loadFactor) {
212            this.loadFactor = loadFactor;
213        }
214        
215        /**
216         * @return the maximumCapacity
217         */
218        public int getMaximumCapacity() {
219            return maximumCapacity;
220        }
221    
222        /**
223         * @param maximumCapacity the maximumCapacity to set
224         */
225        public void setMaximumCapacity(int maximumCapacity) {
226            this.maximumCapacity = maximumCapacity;
227        }
228        
229        public synchronized int getSize() {
230            return size;
231        }
232        
233        public synchronized int getActiveBins(){
234            return activeBins;
235        }
236    
237        public synchronized void load() {
238            if (loaded.compareAndSet(false, true)) {
239                int capacity = 1;
240                while (capacity < numberOfBins) {
241                    capacity <<= 1;
242                }
243                this.bins = new HashBin[capacity];
244                this.numberOfBins=capacity;
245                threshold = calculateThreashold();
246                keysPerPage = pageSize / keySize;
247                dataIn = new DataByteArrayInputStream();
248                dataOut = new DataByteArrayOutputStream(pageSize);
249                readBuffer = new byte[pageSize];
250                try {
251                    openIndexFile();
252                    if (indexFile.length() > 0) {
253                        doCompress();
254                    }
255                } catch (IOException e) {
256                    LOG.error("Failed to load index ", e);
257                    throw new RuntimeException(e);
258                }
259            }
260        }    
261    
262        public synchronized void unload() throws IOException {
263            if (loaded.compareAndSet(true, false)) {
264                if (indexFile != null) {
265                    indexFile.close();
266                    indexFile = null;
267                    freeList.clear();
268                    pageCache.clear();
269                    bins = new HashBin[bins.length];
270                }
271            }
272        }
273    
274        public synchronized void store(Object key, StoreEntry value) throws IOException {
275            load();
276            HashEntry entry = new HashEntry();
277            entry.setKey((Comparable)key);
278            entry.setIndexOffset(value.getOffset());
279            if (!getBin(key).put(entry)) {
280                this.size++;
281            }
282            if (this.size >= this.threshold) {
283                resize(2*bins.length);
284            }
285            if(this.size > this.highestSize) {
286                this.highestSize=this.size;
287            }
288        }
289    
290        public synchronized StoreEntry get(Object key) throws IOException {
291            load();
292            HashEntry entry = new HashEntry();
293            entry.setKey((Comparable)key);
294            HashEntry result = getBin(key).find(entry);
295            return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
296        }
297    
298        public synchronized StoreEntry remove(Object key) throws IOException {
299            load();
300            StoreEntry result = null;
301            HashEntry entry = new HashEntry();
302            entry.setKey((Comparable)key);
303            HashEntry he = getBin(key).remove(entry);
304            if (he != null) {
305                this.size--;
306                result = this.indexManager.getIndex(he.getIndexOffset());
307            }
308            if (this.highestSize > LOW_WATER_MARK &&  this.highestSize > (this.size *2)) {
309                int newSize = this.size/this.keysPerPage;
310                newSize = Math.max(128, newSize);
311                this.highestSize=0;
312                resize(newSize);
313                
314            }
315            return result;
316        }
317    
318        public synchronized boolean containsKey(Object key) throws IOException {
319            return get(key) != null;
320        }
321    
322        public synchronized void clear() throws IOException {
323            unload();
324            delete();
325            openIndexFile();
326            load();
327        }
328    
329        public synchronized void delete() throws IOException {
330            unload();
331            if (file.exists()) {
332                file.delete();
333            }
334            length = 0;
335        }
336    
337        HashPage lookupPage(long pageId) throws IOException {
338            HashPage result = null;
339            if (pageId >= 0) {
340                result = getFromCache(pageId);
341                if (result == null) {
342                    result = getFullPage(pageId);
343                    if (result != null) {
344                        if (result.isActive()) {
345                            addToCache(result);
346                        } else {
347                            throw new IllegalStateException("Trying to access an inactive page: " + pageId);
348                        }
349                    }
350                }
351            }
352            return result;
353        }
354    
355        HashPage createPage(int binId) throws IOException {
356            HashPage result = getNextFreePage();
357            if (result == null) {  
358                // allocate one
359                result = new HashPage(keysPerPage);
360                result.setId(length);
361                result.setBinId(binId);
362                writePageHeader(result);
363                length += pageSize;
364                indexFile.seek(length);
365                indexFile.write(HashEntry.NOT_SET);
366            }
367            addToCache(result);
368            return result;
369        }
370    
371        void releasePage(HashPage page) throws IOException {
372            removeFromCache(page);
373            page.reset();
374            page.setActive(false);
375            writePageHeader(page);
376            freeList.add(page);
377        }
378    
379        private HashPage getNextFreePage() throws IOException {
380            HashPage result = null;
381            if(!freeList.isEmpty()) {
382                result = freeList.removeFirst();
383                result.setActive(true);
384                result.reset();
385                writePageHeader(result);
386            }
387            return result;
388        }
389    
390        void writeFullPage(HashPage page) throws IOException {
391            dataOut.reset();
392            page.write(keyMarshaller, dataOut);
393            if (dataOut.size() > pageSize) {
394                throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
395            }
396            indexFile.seek(page.getId());
397            indexFile.write(dataOut.getData(), 0, dataOut.size());
398        }
399    
400        void writePageHeader(HashPage page) throws IOException {
401            dataOut.reset();
402            page.writeHeader(dataOut);
403            indexFile.seek(page.getId());
404            indexFile.write(dataOut.getData(), 0, HashPage.PAGE_HEADER_SIZE);
405        }
406    
407        HashPage getFullPage(long id) throws IOException {
408            indexFile.seek(id);
409            indexFile.readFully(readBuffer, 0, pageSize);
410            dataIn.restart(readBuffer);
411            HashPage page = new HashPage(keysPerPage);
412            page.setId(id);
413            page.read(keyMarshaller, dataIn);
414            return page;
415        }
416    
417        HashPage getPageHeader(long id) throws IOException {
418            indexFile.seek(id);
419            indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
420            dataIn.restart(readBuffer);
421            HashPage page = new HashPage(keysPerPage);
422            page.setId(id);
423            page.readHeader(dataIn);
424            return page;
425        }
426    
427        void addToBin(HashPage page) throws IOException {
428            int index = page.getBinId();
429            if (index >= this.bins.length) {
430                resize(index+1);
431            }
432            HashBin bin = getBin(index);
433            bin.addHashPageInfo(page.getId(), page.getPersistedSize());
434        }
435    
436        private HashBin getBin(int index) {
437            
438            HashBin result = bins[index];
439            if (result == null) {
440                result = new HashBin(this, index, pageSize / keySize);
441                bins[index] = result;
442                activeBins++;
443            }
444            return result;
445        }
446    
447        private void openIndexFile() throws IOException {
448            if (indexFile == null) {
449                file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name));
450                IOHelper.mkdirs(file.getParentFile());
451                indexFile = new RandomAccessFile(file, "rw");
452            }
453        }
454        
455        private HashBin getBin(Object key) {
456            int hash = hash(key);
457            int i = indexFor(hash, bins.length);
458            return getBin(i);
459        }
460    
461        private HashPage getFromCache(long pageId) {
462            HashPage result = null;
463            if (enablePageCaching) {
464                result = pageCache.get(pageId);
465            }
466            return result;
467        }
468    
469        private void addToCache(HashPage page) {
470            if (enablePageCaching) {
471                pageCache.put(page.getId(), page);
472            }
473        }
474    
475        private void removeFromCache(HashPage page) {
476            if (enablePageCaching) {
477                pageCache.remove(page.getId());
478            }
479        }
480        
481        private void doLoad() throws IOException {
482            long offset = 0;
483            if (loaded.compareAndSet(false, true)) {
484                while ((offset + pageSize) <= indexFile.length()) {
485                    indexFile.seek(offset);
486                    indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
487                    dataIn.restart(readBuffer);
488                    HashPage page = new HashPage(keysPerPage);
489                    page.setId(offset);
490                    page.readHeader(dataIn);
491                    if (!page.isActive()) {
492                        page.reset();
493                        freeList.add(page);
494                    } else {
495                        addToBin(page);
496                        size+=page.size();
497                    }
498                    offset += pageSize;
499                }
500                length=offset;
501            }
502        }
503        
504        private void doCompress() throws IOException {
505            String backFileName = name + "-COMPRESS";
506            HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
507            backIndex.setKeyMarshaller(keyMarshaller);
508            backIndex.setKeySize(getKeySize());
509            backIndex.setNumberOfBins(getNumberOfBins());
510            backIndex.setPageSize(getPageSize());
511            backIndex.load();
512            File backFile = backIndex.file;
513            long offset = 0;
514            while ((offset + pageSize) <= indexFile.length()) {
515                indexFile.seek(offset);
516                HashPage page = getFullPage(offset);
517                if (page.isActive()) {
518                    for (HashEntry entry : page.getEntries()) {
519                        backIndex.getBin(entry.getKey()).put(entry);
520                        backIndex.size++;
521                    }
522                }
523                page=null;
524                offset += pageSize;
525            }
526            backIndex.unload();
527          
528            unload();
529            IOHelper.deleteFile(file);
530            IOHelper.copyFile(backFile, file);
531            IOHelper.deleteFile(backFile);
532            openIndexFile();
533            doLoad();
534        }
535        
536        private void resize(int newCapacity) throws IOException {
537            if (bins.length < getMaximumCapacity()) {
538                if (newCapacity != numberOfBins) {
539                    int capacity = 1;
540                    while (capacity < newCapacity) {
541                        capacity <<= 1;
542                    }
543                    newCapacity=capacity;
544                    if (newCapacity != numberOfBins) {
545                        LOG.info("Resize hash bins " + this.name + " from " + numberOfBins + " to " + newCapacity);
546                        String backFileName = name + "-REISZE";
547                        HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
548                        backIndex.setKeyMarshaller(keyMarshaller);
549                        backIndex.setKeySize(getKeySize());
550                        backIndex.setNumberOfBins(newCapacity);
551                        backIndex.setPageSize(getPageSize());
552                        backIndex.load();
553                        File backFile = backIndex.file;
554                        long offset = 0;
555                        while ((offset + pageSize) <= indexFile.length()) {
556                            indexFile.seek(offset);
557                            HashPage page = getFullPage(offset);
558                            if (page.isActive()) {
559                                for (HashEntry entry : page.getEntries()) {
560                                    backIndex.getBin(entry.getKey()).put(entry);
561                                    backIndex.size++;
562                                }
563                            }
564                            page=null;
565                            offset += pageSize;
566                        }
567                        backIndex.unload();
568                      
569                        unload();
570                        IOHelper.deleteFile(file);
571                        IOHelper.copyFile(backFile, file);
572                        IOHelper.deleteFile(backFile);
573                        setNumberOfBins(newCapacity);
574                        bins = new HashBin[newCapacity];
575                        threshold = calculateThreashold();
576                        openIndexFile();
577                        doLoad();
578                    }
579                }
580            }else {
581                threshold = Integer.MAX_VALUE;
582                return;
583            }
584        }
585        
586        private int calculateThreashold() {
587            return (int)(bins.length * loadFactor);
588        }
589        
590        
591        public String toString() {
592            String str = "HashIndex"+System.identityHashCode(this)+": "+file.getName();
593            return str;
594        }
595          
596    
597        static int hash(Object x) {
598            int h = x.hashCode();
599            h += ~(h << 9);
600            h ^= h >>> 14;
601            h += h << 4;
602            h ^= h >>> 10;
603            return h;
604        }
605    
606        static int indexFor(int h, int length) {
607            return h & (length - 1);
608        }
609    
610        static {
611            DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "1024"));
612            DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
613            DEFAULT_BIN_SIZE= Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
614            MAXIMUM_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384"));
615            DEFAULT_LOAD_FACTOR=Integer.parseInt(System.getProperty("defaultLoadFactor","50"));
616        }
617    }