001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.kaha.impl.index.tree;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.io.RandomAccessFile;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    import org.apache.activemq.kaha.Marshaller;
024    import org.apache.activemq.kaha.StoreEntry;
025    import org.apache.activemq.kaha.impl.index.Index;
026    import org.apache.activemq.kaha.impl.index.IndexManager;
027    import org.apache.activemq.util.DataByteArrayInputStream;
028    import org.apache.activemq.util.DataByteArrayOutputStream;
029    import org.apache.activemq.util.IOHelper;
030    import org.apache.activemq.util.LRUCache;
031    import org.slf4j.Logger;
032    import org.slf4j.LoggerFactory;
033    
034    /**
035     * BTree implementation
036     * 
037     * 
038     */
039    public class TreeIndex implements Index {
040    
041        private static final String NAME_PREFIX = "tree-index-";
042        private static final int DEFAULT_PAGE_SIZE;
043        private static final int DEFAULT_KEY_SIZE;
044        private static final Logger LOG = LoggerFactory.getLogger(TreeIndex.class);
045        private final String name;
046        private File directory;
047        private File file;
048        private RandomAccessFile indexFile;
049        private IndexManager indexManager;
050        private int pageSize = DEFAULT_PAGE_SIZE;
051        private int keySize = DEFAULT_KEY_SIZE;
052        private int keysPerPage = pageSize / keySize;
053        private TreePage root;
054        private LRUCache<Long, TreePage> pageCache;
055        private DataByteArrayInputStream dataIn;
056        private DataByteArrayOutputStream dataOut;
057        private byte[] readBuffer;
058        private Marshaller keyMarshaller;
059        private long length;
060        private TreePage firstFree;
061        private TreePage lastFree;
062        private AtomicBoolean loaded = new AtomicBoolean();
063        private boolean enablePageCaching = true;
064        private int pageCacheSize = 10;
065    
066        /**
067         * Constructor
068         * 
069         * @param directory
070         * @param name
071         * @param indexManager
072         * @throws IOException
073         */
074        public TreeIndex(File directory, String name, IndexManager indexManager) throws IOException {
075            this.directory = directory;
076            this.name = name;
077            this.indexManager = indexManager;
078            pageCache = new LRUCache<Long, TreePage>(pageCacheSize, pageCacheSize, 0.75f, true);
079            openIndexFile();
080        }
081    
082        /**
083         * Set the marshaller for key objects
084         * 
085         * @param marshaller
086         */
087        public void setKeyMarshaller(Marshaller marshaller) {
088            this.keyMarshaller = marshaller;
089        }
090    
091        /**
092         * @return the keySize
093         */
094        public int getKeySize() {
095            return this.keySize;
096        }
097    
098        /**
099         * @param keySize the keySize to set
100         */
101        public void setKeySize(int keySize) {
102            this.keySize = keySize;
103            if (loaded.get()) {
104                throw new RuntimeException("Pages already loaded - can't reset key size");
105            }
106        }
107    
108        /**
109         * @return the pageSize
110         */
111        public int getPageSize() {
112            return this.pageSize;
113        }
114    
115        /**
116         * @param pageSize the pageSize to set
117         */
118        public void setPageSize(int pageSize) {
119            if (loaded.get() && pageSize != this.pageSize) {
120                throw new RuntimeException("Pages already loaded - can't reset page size");
121            }
122            this.pageSize = pageSize;
123        }
124    
125        public boolean isTransient() {
126            return false;
127        }
128    
129        /**
130         * @return the enablePageCaching
131         */
132        public boolean isEnablePageCaching() {
133            return this.enablePageCaching;
134        }
135    
136        /**
137         * @param enablePageCaching the enablePageCaching to set
138         */
139        public void setEnablePageCaching(boolean enablePageCaching) {
140            this.enablePageCaching = enablePageCaching;
141        }
142    
143        /**
144         * @return the pageCacheSize
145         */
146        public int getPageCacheSize() {
147            return this.pageCacheSize;
148        }
149    
150        /**
151         * @param pageCacheSize the pageCacheSize to set
152         */
153        public void setPageCacheSize(int pageCacheSize) {
154            this.pageCacheSize = pageCacheSize;
155            pageCache.setMaxCacheSize(pageCacheSize);
156        }
157    
158        public void load() {
159            if (loaded.compareAndSet(false, true)) {
160                keysPerPage = pageSize / keySize;
161                dataIn = new DataByteArrayInputStream();
162                dataOut = new DataByteArrayOutputStream(pageSize);
163                readBuffer = new byte[pageSize];
164                try {
165                    openIndexFile();
166                    long offset = 0;
167                    while ((offset + pageSize) <= indexFile.length()) {
168                        indexFile.seek(offset);
169                        indexFile.readFully(readBuffer, 0, TreePage.PAGE_HEADER_SIZE);
170                        dataIn.restart(readBuffer);
171                        TreePage page = new TreePage(keysPerPage);
172                        page.setTree(this);
173                        page.setId(offset);
174                        page.readHeader(dataIn);
175                        if (!page.isActive()) {
176                            if (lastFree != null) {
177                                lastFree.setNextFreePageId(offset);
178                                indexFile.seek(lastFree.getId());
179                                dataOut.reset();
180                                lastFree.writeHeader(dataOut);
181                                indexFile.write(dataOut.getData(), 0, TreePage.PAGE_HEADER_SIZE);
182                                lastFree = page;
183                            } else {
184                                lastFree = page;
185                                firstFree = page;
186                            }
187                        } else if (root == null && page.isRoot()) {
188                            root = getFullPage(offset);
189                        }
190                        offset += pageSize;
191                    }
192                    length = offset;
193                    if (root == null) {
194                        root = createRoot();
195                    }
196                } catch (IOException e) {
197                    LOG.error("Failed to load index ", e);
198                    throw new RuntimeException(e);
199                }
200            }
201        }
202    
203        public void unload() throws IOException {
204            if (loaded.compareAndSet(true, false)) {
205                if (indexFile != null) {
206                    indexFile.close();
207                    indexFile = null;
208                    pageCache.clear();
209                    root = null;
210                    firstFree = null;
211                    lastFree = null;
212                }
213            }
214        }
215    
216        public void store(Object key, StoreEntry value) throws IOException {
217            TreeEntry entry = new TreeEntry();
218            entry.setKey((Comparable)key);
219            entry.setIndexOffset(value.getOffset());
220            root.put(entry);
221        }
222    
223        public StoreEntry get(Object key) throws IOException {
224            TreeEntry entry = new TreeEntry();
225            entry.setKey((Comparable)key);
226            TreeEntry result = root.find(entry);
227            return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
228        }
229    
230        public StoreEntry remove(Object key) throws IOException {
231            TreeEntry entry = new TreeEntry();
232            entry.setKey((Comparable)key);
233            TreeEntry result = root.remove(entry);
234            return result != null ? indexManager.getIndex(result.getIndexOffset()) : null;
235        }
236    
237        public boolean containsKey(Object key) throws IOException {
238            TreeEntry entry = new TreeEntry();
239            entry.setKey((Comparable)key);
240            return root.find(entry) != null;
241        }
242    
243        public void clear() throws IOException {
244            unload();
245            delete();
246            openIndexFile();
247            load();
248        }
249    
250        public void delete() throws IOException {
251            unload();
252            if (file.exists()) {
253                boolean result = file.delete();
254            }
255            length = 0;
256        }
257    
258        /**
259         * @return the root
260         */
261        TreePage getRoot() {
262            return this.root;
263        }
264    
265        TreePage lookupPage(long pageId) throws IOException {
266            TreePage result = null;
267            if (pageId >= 0) {
268                if (root != null && root.getId() == pageId) {
269                    result = root;
270                } else {
271                    result = getFromCache(pageId);
272                }
273                if (result == null) {
274                    result = getFullPage(pageId);
275                    if (result != null) {
276                        if (result.isActive()) {
277                            addToCache(result);
278                        } else {
279                            throw new IllegalStateException("Trying to access an inactive page: " + pageId + " root is " + root);
280                        }
281                    }
282                }
283            }
284            return result;
285        }
286    
287        TreePage createRoot() throws IOException {
288            TreePage result = createPage(-1);
289            root = result;
290            return result;
291        }
292    
293        TreePage createPage(long parentId) throws IOException {
294            TreePage result = getNextFreePage();
295            if (result == null) {
296                // allocate one
297                result = new TreePage(keysPerPage);
298                result.setId(length);
299                result.setTree(this);
300                result.setParentId(parentId);
301                writePage(result);
302                length += pageSize;
303                indexFile.seek(length);
304                indexFile.write(TreeEntry.NOT_SET);
305            }
306            addToCache(result);
307            return result;
308        }
309    
310        void releasePage(TreePage page) throws IOException {
311            removeFromCache(page);
312            page.reset();
313            page.setActive(false);
314            if (lastFree == null) {
315                firstFree = page;
316                lastFree = page;
317            } else {
318                lastFree.setNextFreePageId(page.getId());
319                writePage(lastFree);
320            }
321            writePage(page);
322        }
323    
324        private TreePage getNextFreePage() throws IOException {
325            TreePage result = null;
326            if (firstFree != null) {
327                if (firstFree.equals(lastFree)) {
328                    result = firstFree;
329                    firstFree = null;
330                    lastFree = null;
331                } else {
332                    result = firstFree;
333                    firstFree = getPage(firstFree.getNextFreePageId());
334                    if (firstFree == null) {
335                        lastFree = null;
336                    }
337                }
338                result.setActive(true);
339                result.reset();
340                result.saveHeader();
341            }
342            return result;
343        }
344    
345        void writeFullPage(TreePage page) throws IOException {
346            dataOut.reset();
347            page.write(keyMarshaller, dataOut);
348            if (dataOut.size() > pageSize) {
349                throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
350            }
351            indexFile.seek(page.getId());
352            indexFile.write(dataOut.getData(), 0, dataOut.size());
353        }
354    
355        void writePage(TreePage page) throws IOException {
356            dataOut.reset();
357            page.writeHeader(dataOut);
358            indexFile.seek(page.getId());
359            indexFile.write(dataOut.getData(), 0, TreePage.PAGE_HEADER_SIZE);
360        }
361    
362        TreePage getFullPage(long id) throws IOException {
363            indexFile.seek(id);
364            indexFile.readFully(readBuffer, 0, pageSize);
365            dataIn.restart(readBuffer);
366            TreePage page = new TreePage(keysPerPage);
367            page.setId(id);
368            page.setTree(this);
369            page.read(keyMarshaller, dataIn);
370            return page;
371        }
372    
373        TreePage getPage(long id) throws IOException {
374            indexFile.seek(id);
375            indexFile.readFully(readBuffer, 0, TreePage.PAGE_HEADER_SIZE);
376            dataIn.restart(readBuffer);
377            TreePage page = new TreePage(keysPerPage);
378            page.setId(id);
379            page.setTree(this);
380            page.readHeader(dataIn);
381            return page;
382        }
383    
384        private TreePage getFromCache(long pageId) {
385            TreePage result = null;
386            if (enablePageCaching) {
387                result = pageCache.get(pageId);
388            }
389            return result;
390        }
391    
392        private void addToCache(TreePage page) {
393            if (enablePageCaching) {
394                pageCache.put(page.getId(), page);
395            }
396        }
397    
398        private void removeFromCache(TreePage page) {
399            if (enablePageCaching) {
400                pageCache.remove(page.getId());
401            }
402        }
403    
404        protected void openIndexFile() throws IOException {
405            if (indexFile == null) {
406                file = new File(directory, NAME_PREFIX + IOHelper.toFileSystemSafeName(name));
407                IOHelper.mkdirs(file.getParentFile());
408                indexFile = new RandomAccessFile(file, "rw");
409            }
410        }
411    
412        static {
413            DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "16384"));
414            DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
415        }
416    
417        public int getSize() {
418            return 0;
419        }
420    }