001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.disk.page;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInputStream;
022    import java.io.DataOutputStream;
023    import java.io.File;
024    import java.io.FileInputStream;
025    import java.io.FileOutputStream;
026    import java.io.IOException;
027    import java.io.InterruptedIOException;
028    import java.io.RandomAccessFile;
029    import java.util.ArrayList;
030    import java.util.Arrays;
031    import java.util.Collection;
032    import java.util.Collections;
033    import java.util.Iterator;
034    import java.util.LinkedHashMap;
035    import java.util.Map;
036    import java.util.Map.Entry;
037    import java.util.Properties;
038    import java.util.TreeMap;
039    import java.util.concurrent.CountDownLatch;
040    import java.util.concurrent.atomic.AtomicBoolean;
041    import java.util.concurrent.atomic.AtomicLong;
042    import java.util.zip.Adler32;
043    import java.util.zip.Checksum;
044    
045    import org.apache.activemq.util.DataByteArrayOutputStream;
046    import org.apache.activemq.util.IOExceptionSupport;
047    import org.apache.activemq.util.IOHelper;
048    import org.apache.activemq.util.IntrospectionSupport;
049    import org.apache.activemq.util.LFUCache;
050    import org.apache.activemq.util.LRUCache;
051    import org.apache.activemq.store.kahadb.disk.util.Sequence;
052    import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
053    import org.slf4j.Logger;
054    import org.slf4j.LoggerFactory;
055    
056    /**
057     * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should
058     * be externally synchronized.
059     * <p/>
060     * The file has 3 parts:
061     * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
062     * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
063     * Page Space: The pages in the page file.
064     */
065    public class PageFile {
066    
067        private static final String PAGEFILE_SUFFIX = ".data";
068        private static final String RECOVERY_FILE_SUFFIX = ".redo";
069        private static final String FREE_FILE_SUFFIX = ".free";
070    
071        // 4k Default page size.
072        public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4);
073        public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000);
074        public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);;
075    
076        private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
077        private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
078    
079        // Recovery header is (long offset)
080        private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
081    
082        // A PageFile will use a couple of files in this directory
083        private File directory;
084        // And the file names in that directory will be based on this name.
085        private final String name;
086    
087        // File handle used for reading pages..
088        private RandomAccessFile readFile;
089        // File handle used for writing pages..
090        private RandomAccessFile writeFile;
091        // File handle used for writing pages..
092        private RandomAccessFile recoveryFile;
093    
094        // The size of pages
095        private int pageSize = DEFAULT_PAGE_SIZE;
096    
097        // The minimum number of space allocated to the recovery file in number of pages.
098        private int recoveryFileMinPageCount = 1000;
099        // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
100        // to this max size as soon as  possible.
101        private int recoveryFileMaxPageCount = 10000;
102        // The number of pages in the current recovery buffer
103        private int recoveryPageCount;
104    
105        private AtomicBoolean loaded = new AtomicBoolean();
106        // The number of pages we are aiming to write every time we
107        // write to disk.
108        int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
109    
110        // We keep a cache of pages recently used?
111        private Map<Long, Page> pageCache;
112        // The cache of recently used pages.
113        private boolean enablePageCaching = true;
114        // How many pages will we keep in the cache?
115        private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
116    
117        // Should first log the page write to the recovery buffer? Avoids partial
118        // page write failures..
119        private boolean enableRecoveryFile = true;
120        // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
121        private boolean enableDiskSyncs = true;
122        // Will writes be done in an async thread?
123        private boolean enabledWriteThread = false;
124    
125        // These are used if enableAsyncWrites==true
126        private AtomicBoolean stopWriter = new AtomicBoolean();
127        private Thread writerThread;
128        private CountDownLatch checkpointLatch;
129    
130        // Keeps track of writes that are being written to disk.
131        private TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
132    
133        // Keeps track of free pages.
134        private final AtomicLong nextFreePageId = new AtomicLong();
135        private SequenceSet freeList = new SequenceSet();
136    
137        private AtomicLong nextTxid = new AtomicLong();
138    
139        // Persistent settings stored in the page file.
140        private MetaData metaData;
141    
142        private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
143    
144        private boolean useLFRUEviction = false;
145        private float LFUEvictionFactor = 0.2f;
146    
147        /**
148         * Use to keep track of updated pages which have not yet been committed.
149         */
150        static class PageWrite {
151            Page page;
152            byte[] current;
153            byte[] diskBound;
154            long currentLocation = -1;
155            long diskBoundLocation = -1;
156            File tmpFile;
157            int length;
158    
159            public PageWrite(Page page, byte[] data) {
160                this.page = page;
161                current = data;
162            }
163    
164            public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
165                this.page = page;
166                this.currentLocation = currentLocation;
167                this.tmpFile = tmpFile;
168                this.length = length;
169            }
170    
171            public void setCurrent(Page page, byte[] data) {
172                this.page = page;
173                current = data;
174                currentLocation = -1;
175                diskBoundLocation = -1;
176            }
177    
178            public void setCurrentLocation(Page page, long location, int length) {
179                this.page = page;
180                this.currentLocation = location;
181                this.length = length;
182                this.current = null;
183            }
184    
185            @Override
186            public String toString() {
187                return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]";
188            }
189    
190            @SuppressWarnings("unchecked")
191            public Page getPage() {
192                return page;
193            }
194    
195            public byte[] getDiskBound() throws IOException {
196                if (diskBound == null && diskBoundLocation != -1) {
197                    diskBound = new byte[length];
198                    RandomAccessFile file = new RandomAccessFile(tmpFile, "r");
199                    file.seek(diskBoundLocation);
200                    file.read(diskBound);
201                    file.close();
202                    diskBoundLocation = -1;
203                }
204                return diskBound;
205            }
206    
207            void begin() {
208                if (currentLocation != -1) {
209                    diskBoundLocation = currentLocation;
210                } else {
211                    diskBound = current;
212                }
213                current = null;
214                currentLocation = -1;
215            }
216    
217            /**
218             * @return true if there is no pending writes to do.
219             */
220            boolean done() {
221                diskBoundLocation = -1;
222                diskBound = null;
223                return current == null || currentLocation == -1;
224            }
225    
226            boolean isDone() {
227                return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
228            }
229        }
230    
231        /**
232         * The MetaData object hold the persistent data associated with a PageFile object.
233         */
234        public static class MetaData {
235    
236            String fileType;
237            String fileTypeVersion;
238    
239            long metaDataTxId = -1;
240            int pageSize;
241            boolean cleanShutdown;
242            long lastTxId;
243            long freePages;
244    
245            public String getFileType() {
246                return fileType;
247            }
248    
249            public void setFileType(String fileType) {
250                this.fileType = fileType;
251            }
252    
253            public String getFileTypeVersion() {
254                return fileTypeVersion;
255            }
256    
257            public void setFileTypeVersion(String version) {
258                this.fileTypeVersion = version;
259            }
260    
261            public long getMetaDataTxId() {
262                return metaDataTxId;
263            }
264    
265            public void setMetaDataTxId(long metaDataTxId) {
266                this.metaDataTxId = metaDataTxId;
267            }
268    
269            public int getPageSize() {
270                return pageSize;
271            }
272    
273            public void setPageSize(int pageSize) {
274                this.pageSize = pageSize;
275            }
276    
277            public boolean isCleanShutdown() {
278                return cleanShutdown;
279            }
280    
281            public void setCleanShutdown(boolean cleanShutdown) {
282                this.cleanShutdown = cleanShutdown;
283            }
284    
285            public long getLastTxId() {
286                return lastTxId;
287            }
288    
289            public void setLastTxId(long lastTxId) {
290                this.lastTxId = lastTxId;
291            }
292    
293            public long getFreePages() {
294                return freePages;
295            }
296    
297            public void setFreePages(long value) {
298                this.freePages = value;
299            }
300        }
301    
302        public Transaction tx() {
303            assertLoaded();
304            return new Transaction(this);
305        }
306    
307        /**
308         * Creates a PageFile in the specified directory who's data files are named by name.
309         */
310        public PageFile(File directory, String name) {
311            this.directory = directory;
312            this.name = name;
313        }
314    
315        /**
316         * Deletes the files used by the PageFile object.  This method can only be used when this object is not loaded.
317         *
318         * @throws IOException           if the files cannot be deleted.
319         * @throws IllegalStateException if this PageFile is loaded
320         */
321        public void delete() throws IOException {
322            if (loaded.get()) {
323                throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
324            }
325            delete(getMainPageFile());
326            delete(getFreeFile());
327            delete(getRecoveryFile());
328        }
329    
330        public void archive() throws IOException {
331            if (loaded.get()) {
332                throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
333            }
334            long timestamp = System.currentTimeMillis();
335            archive(getMainPageFile(), String.valueOf(timestamp));
336            archive(getFreeFile(), String.valueOf(timestamp));
337            archive(getRecoveryFile(), String.valueOf(timestamp));
338        }
339    
340        /**
341         * @param file
342         * @throws IOException
343         */
344        private void delete(File file) throws IOException {
345            if (file.exists() && !file.delete()) {
346                throw new IOException("Could not delete: " + file.getPath());
347            }
348        }
349    
350        private void archive(File file, String suffix) throws IOException {
351            if (file.exists()) {
352                File archive = new File(file.getPath() + "-" + suffix);
353                if (!file.renameTo(archive)) {
354                    throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath());
355                }
356            }
357        }
358    
359        /**
360         * Loads the page file so that it can be accessed for read/write purposes.  This allocates OS resources.  If this is the
361         * first time the page file is loaded, then this creates the page file in the file system.
362         *
363         * @throws IOException           If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if
364         *                               there was a disk error.
365         * @throws IllegalStateException If the page file was already loaded.
366         */
367        public void load() throws IOException, IllegalStateException {
368            if (loaded.compareAndSet(false, true)) {
369    
370                if (enablePageCaching) {
371                    if (isUseLFRUEviction()) {
372                        pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor()));
373                    } else {
374                        pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
375                    }
376                }
377    
378                File file = getMainPageFile();
379                IOHelper.mkdirs(file.getParentFile());
380                writeFile = new RandomAccessFile(file, "rw");
381                readFile = new RandomAccessFile(file, "r");
382    
383                if (readFile.length() > 0) {
384                    // Load the page size setting cause that can't change once the file is created.
385                    loadMetaData();
386                    pageSize = metaData.getPageSize();
387                } else {
388                    // Store the page size setting cause that can't change once the file is created.
389                    metaData = new MetaData();
390                    metaData.setFileType(PageFile.class.getName());
391                    metaData.setFileTypeVersion("1");
392                    metaData.setPageSize(getPageSize());
393                    metaData.setCleanShutdown(true);
394                    metaData.setFreePages(-1);
395                    metaData.setLastTxId(0);
396                    storeMetaData();
397                }
398    
399                if (enableRecoveryFile) {
400                    recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw");
401                }
402    
403                if (metaData.isCleanShutdown()) {
404                    nextTxid.set(metaData.getLastTxId() + 1);
405                    if (metaData.getFreePages() > 0) {
406                        loadFreeList();
407                    }
408                } else {
409                    LOG.debug(toString() + ", Recovering page file...");
410                    nextTxid.set(redoRecoveryUpdates());
411    
412                    // Scan all to find the free pages.
413                    freeList = new SequenceSet();
414                    for (Iterator<Page> i = tx().iterator(true); i.hasNext(); ) {
415                        Page page = i.next();
416                        if (page.getType() == Page.PAGE_FREE_TYPE) {
417                            freeList.add(page.getPageId());
418                        }
419                    }
420                }
421    
422                metaData.setCleanShutdown(false);
423                storeMetaData();
424                getFreeFile().delete();
425    
426                if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
427                    writeFile.setLength(PAGE_FILE_HEADER_SIZE);
428                }
429                nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize);
430                startWriter();
431    
432            } else {
433                throw new IllegalStateException("Cannot load the page file when it is already loaded.");
434            }
435        }
436    
437    
438        /**
439         * Unloads a previously loaded PageFile.  This deallocates OS related resources like file handles.
440         * once unloaded, you can no longer use the page file to read or write Pages.
441         *
442         * @throws IOException           if there was a disk error occurred while closing the down the page file.
443         * @throws IllegalStateException if the PageFile is not loaded
444         */
445        public void unload() throws IOException {
446            if (loaded.compareAndSet(true, false)) {
447                flush();
448                try {
449                    stopWriter();
450                } catch (InterruptedException e) {
451                    throw new InterruptedIOException();
452                }
453    
454                if (freeList.isEmpty()) {
455                    metaData.setFreePages(0);
456                } else {
457                    storeFreeList();
458                    metaData.setFreePages(freeList.size());
459                }
460    
461                metaData.setLastTxId(nextTxid.get() - 1);
462                metaData.setCleanShutdown(true);
463                storeMetaData();
464    
465                if (readFile != null) {
466                    readFile.close();
467                    readFile = null;
468                    writeFile.close();
469                    writeFile = null;
470                    if (enableRecoveryFile) {
471                        recoveryFile.close();
472                        recoveryFile = null;
473                    }
474                    freeList.clear();
475                    if (pageCache != null) {
476                        pageCache = null;
477                    }
478                    synchronized (writes) {
479                        writes.clear();
480                    }
481                }
482            } else {
483                throw new IllegalStateException("Cannot unload the page file when it is not loaded");
484            }
485        }
486    
487        public boolean isLoaded() {
488            return loaded.get();
489        }
490    
491        /**
492         * Flush and sync all write buffers to disk.
493         *
494         * @throws IOException If an disk error occurred.
495         */
496        public void flush() throws IOException {
497    
498            if (enabledWriteThread && stopWriter.get()) {
499                throw new IOException("Page file already stopped: checkpointing is not allowed");
500            }
501    
502            // Setup a latch that gets notified when all buffered writes hits the disk.
503            CountDownLatch checkpointLatch;
504            synchronized (writes) {
505                if (writes.isEmpty()) {
506                    return;
507                }
508                if (enabledWriteThread) {
509                    if (this.checkpointLatch == null) {
510                        this.checkpointLatch = new CountDownLatch(1);
511                    }
512                    checkpointLatch = this.checkpointLatch;
513                    writes.notify();
514                } else {
515                    writeBatch();
516                    return;
517                }
518            }
519            try {
520                checkpointLatch.await();
521            } catch (InterruptedException e) {
522                InterruptedIOException ioe = new InterruptedIOException();
523                ioe.initCause(e);
524                throw ioe;
525            }
526        }
527    
528    
529        public String toString() {
530            return "Page File: " + getMainPageFile();
531        }
532    
533        ///////////////////////////////////////////////////////////////////
534        // Private Implementation Methods
535        ///////////////////////////////////////////////////////////////////
536        private File getMainPageFile() {
537            return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX);
538        }
539    
540        public File getFreeFile() {
541            return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX);
542        }
543    
544        public File getRecoveryFile() {
545            return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX);
546        }
547    
548        public long toOffset(long pageId) {
549            return PAGE_FILE_HEADER_SIZE + (pageId * pageSize);
550        }
551    
552        private void loadMetaData() throws IOException {
553    
554            ByteArrayInputStream is;
555            MetaData v1 = new MetaData();
556            MetaData v2 = new MetaData();
557            try {
558                Properties p = new Properties();
559                byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
560                readFile.seek(0);
561                readFile.readFully(d);
562                is = new ByteArrayInputStream(d);
563                p.load(is);
564                IntrospectionSupport.setProperties(v1, p);
565            } catch (IOException e) {
566                v1 = null;
567            }
568    
569            try {
570                Properties p = new Properties();
571                byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
572                readFile.seek(PAGE_FILE_HEADER_SIZE / 2);
573                readFile.readFully(d);
574                is = new ByteArrayInputStream(d);
575                p.load(is);
576                IntrospectionSupport.setProperties(v2, p);
577            } catch (IOException e) {
578                v2 = null;
579            }
580    
581            if (v1 == null && v2 == null) {
582                throw new IOException("Could not load page file meta data");
583            }
584    
585            if (v1 == null || v1.metaDataTxId < 0) {
586                metaData = v2;
587            } else if (v2 == null || v1.metaDataTxId < 0) {
588                metaData = v1;
589            } else if (v1.metaDataTxId == v2.metaDataTxId) {
590                metaData = v1; // use the first since the 2nd could be a partial..
591            } else {
592                metaData = v2; // use the second cause the first is probably a partial.
593            }
594        }
595    
596        private void storeMetaData() throws IOException {
597            // Convert the metadata into a property format
598            metaData.metaDataTxId++;
599            Properties p = new Properties();
600            IntrospectionSupport.getProperties(metaData, p, null);
601    
602            ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
603            p.store(os, "");
604            if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
605                throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2);
606            }
607            // Fill the rest with space...
608            byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
609            Arrays.fill(filler, (byte) ' ');
610            os.write(filler);
611            os.flush();
612    
613            byte[] d = os.toByteArray();
614    
615            // So we don't loose it.. write it 2 times...
616            writeFile.seek(0);
617            writeFile.write(d);
618            writeFile.getFD().sync();
619            writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
620            writeFile.write(d);
621            writeFile.getFD().sync();
622        }
623    
624        private void storeFreeList() throws IOException {
625            FileOutputStream os = new FileOutputStream(getFreeFile());
626            DataOutputStream dos = new DataOutputStream(os);
627            SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
628            dos.close();
629        }
630    
631        private void loadFreeList() throws IOException {
632            freeList.clear();
633            FileInputStream is = new FileInputStream(getFreeFile());
634            DataInputStream dis = new DataInputStream(is);
635            freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
636            dis.close();
637        }
638    
639        ///////////////////////////////////////////////////////////////////
640        // Property Accessors
641        ///////////////////////////////////////////////////////////////////
642    
643        /**
644         * Is the recovery buffer used to double buffer page writes.  Enabled by default.
645         *
646         * @return is the recovery buffer enabled.
647         */
648        public boolean isEnableRecoveryFile() {
649            return enableRecoveryFile;
650        }
651    
652        /**
653         * Sets if the recovery buffer uses to double buffer page writes.  Enabled by default.  Disabling this
654         * may potentially cause partial page writes which can lead to page file corruption.
655         */
656        public void setEnableRecoveryFile(boolean doubleBuffer) {
657            assertNotLoaded();
658            this.enableRecoveryFile = doubleBuffer;
659        }
660    
661        /**
662         * @return Are page writes synced to disk?
663         */
664        public boolean isEnableDiskSyncs() {
665            return enableDiskSyncs;
666        }
667    
668        /**
669         * Allows you enable syncing writes to disk.
670         */
671        public void setEnableDiskSyncs(boolean syncWrites) {
672            assertNotLoaded();
673            this.enableDiskSyncs = syncWrites;
674        }
675    
676        /**
677         * @return the page size
678         */
679        public int getPageSize() {
680            return this.pageSize;
681        }
682    
683        /**
684         * @return the amount of content data that a page can hold.
685         */
686        public int getPageContentSize() {
687            return this.pageSize - Page.PAGE_HEADER_SIZE;
688        }
689    
690        /**
691         * Configures the page size used by the page file.  By default it is 4k.  Once a page file is created on disk,
692         * subsequent loads of that file will use the original pageSize.  Once the PageFile is loaded, this setting
693         * can no longer be changed.
694         *
695         * @param pageSize the pageSize to set
696         * @throws IllegalStateException once the page file is loaded.
697         */
698        public void setPageSize(int pageSize) throws IllegalStateException {
699            assertNotLoaded();
700            this.pageSize = pageSize;
701        }
702    
703        /**
704         * @return true if read page caching is enabled
705         */
706        public boolean isEnablePageCaching() {
707            return this.enablePageCaching;
708        }
709    
710        /**
711         * @param enablePageCaching allows you to enable read page caching
712         */
713        public void setEnablePageCaching(boolean enablePageCaching) {
714            assertNotLoaded();
715            this.enablePageCaching = enablePageCaching;
716        }
717    
718        /**
719         * @return the maximum number of pages that will get stored in the read page cache.
720         */
721        public int getPageCacheSize() {
722            return this.pageCacheSize;
723        }
724    
725        /**
726         * @param pageCacheSize Sets the maximum number of pages that will get stored in the read page cache.
727         */
728        public void setPageCacheSize(int pageCacheSize) {
729            assertNotLoaded();
730            this.pageCacheSize = pageCacheSize;
731        }
732    
733        public boolean isEnabledWriteThread() {
734            return enabledWriteThread;
735        }
736    
737        public void setEnableWriteThread(boolean enableAsyncWrites) {
738            assertNotLoaded();
739            this.enabledWriteThread = enableAsyncWrites;
740        }
741    
742        public long getDiskSize() throws IOException {
743            return toOffset(nextFreePageId.get());
744        }
745    
746        /**
747         * @return the number of pages allocated in the PageFile
748         */
749        public long getPageCount() {
750            return nextFreePageId.get();
751        }
752    
753        public int getRecoveryFileMinPageCount() {
754            return recoveryFileMinPageCount;
755        }
756    
757        public long getFreePageCount() {
758            assertLoaded();
759            return freeList.rangeSize();
760        }
761    
762        public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
763            assertNotLoaded();
764            this.recoveryFileMinPageCount = recoveryFileMinPageCount;
765        }
766    
767        public int getRecoveryFileMaxPageCount() {
768            return recoveryFileMaxPageCount;
769        }
770    
771        public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) {
772            assertNotLoaded();
773            this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
774        }
775    
776        public int getWriteBatchSize() {
777            return writeBatchSize;
778        }
779    
780        public void setWriteBatchSize(int writeBatchSize) {
781            this.writeBatchSize = writeBatchSize;
782        }
783    
784        public float getLFUEvictionFactor() {
785            return LFUEvictionFactor;
786        }
787    
788        public void setLFUEvictionFactor(float LFUEvictionFactor) {
789            this.LFUEvictionFactor = LFUEvictionFactor;
790        }
791    
792        public boolean isUseLFRUEviction() {
793            return useLFRUEviction;
794        }
795    
796        public void setUseLFRUEviction(boolean useLFRUEviction) {
797            this.useLFRUEviction = useLFRUEviction;
798        }
799    
800        ///////////////////////////////////////////////////////////////////
801        // Package Protected Methods exposed to Transaction
802        ///////////////////////////////////////////////////////////////////
803    
804        /**
805         * @throws IllegalStateException if the page file is not loaded.
806         */
807        void assertLoaded() throws IllegalStateException {
808            if (!loaded.get()) {
809                throw new IllegalStateException("PageFile is not loaded");
810            }
811        }
812    
813        void assertNotLoaded() throws IllegalStateException {
814            if (loaded.get()) {
815                throw new IllegalStateException("PageFile is loaded");
816            }
817        }
818    
819        /**
820         * Allocates a block of free pages that you can write data to.
821         *
822         * @param count the number of sequential pages to allocate
823         * @return the first page of the sequential set.
824         * @throws IOException           If an disk error occurred.
825         * @throws IllegalStateException if the PageFile is not loaded
826         */
827        <T> Page<T> allocate(int count) throws IOException {
828            assertLoaded();
829            if (count <= 0) {
830                throw new IllegalArgumentException("The allocation count must be larger than zero");
831            }
832    
833            Sequence seq = freeList.removeFirstSequence(count);
834    
835            // We may need to create new free pages...
836            if (seq == null) {
837    
838                Page<T> first = null;
839                int c = count;
840    
841                // Perform the id's only once....
842                long pageId = nextFreePageId.getAndAdd(count);
843                long writeTxnId = nextTxid.getAndAdd(count);
844    
845                while (c-- > 0) {
846                    Page<T> page = new Page<T>(pageId++);
847                    page.makeFree(writeTxnId++);
848    
849                    if (first == null) {
850                        first = page;
851                    }
852    
853                    addToCache(page);
854                    DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
855                    page.write(out);
856                    write(page, out.getData());
857    
858                    // LOG.debug("allocate writing: "+page.getPageId());
859                }
860    
861                return first;
862            }
863    
864            Page<T> page = new Page<T>(seq.getFirst());
865            page.makeFree(0);
866            // LOG.debug("allocated: "+page.getPageId());
867            return page;
868        }
869    
870        long getNextWriteTransactionId() {
871            return nextTxid.incrementAndGet();
872        }
873    
874        synchronized void readPage(long pageId, byte[] data) throws IOException {
875            readFile.seek(toOffset(pageId));
876            readFile.readFully(data);
877        }
878    
879        public void freePage(long pageId) {
880            freeList.add(pageId);
881            removeFromCache(pageId);
882        }
883    
884        @SuppressWarnings("unchecked")
885        private <T> void write(Page<T> page, byte[] data) throws IOException {
886            final PageWrite write = new PageWrite(page, data);
887            Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
888                public Long getKey() {
889                    return write.getPage().getPageId();
890                }
891    
892                public PageWrite getValue() {
893                    return write;
894                }
895    
896                public PageWrite setValue(PageWrite value) {
897                    return null;
898                }
899            };
900            Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry};
901            write(Arrays.asList(entries));
902        }
903    
904        void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
905            synchronized (writes) {
906                if (enabledWriteThread) {
907                    while (writes.size() >= writeBatchSize && !stopWriter.get()) {
908                        try {
909                            writes.wait();
910                        } catch (InterruptedException e) {
911                            Thread.currentThread().interrupt();
912                            throw new InterruptedIOException();
913                        }
914                    }
915                }
916    
917                boolean longTx = false;
918    
919                for (Map.Entry<Long, PageWrite> entry : updates) {
920                    Long key = entry.getKey();
921                    PageWrite value = entry.getValue();
922                    PageWrite write = writes.get(key);
923                    if (write == null) {
924                        writes.put(key, value);
925                    } else {
926                        if (value.currentLocation != -1) {
927                            write.setCurrentLocation(value.page, value.currentLocation, value.length);
928                            write.tmpFile = value.tmpFile;
929                            longTx = true;
930                        } else {
931                            write.setCurrent(value.page, value.current);
932                        }
933                    }
934                }
935    
936                // Once we start approaching capacity, notify the writer to start writing
937                // sync immediately for long txs
938                if (longTx || canStartWriteBatch()) {
939    
940                    if (enabledWriteThread) {
941                        writes.notify();
942                    } else {
943                        writeBatch();
944                    }
945                }
946            }
947        }
948    
949        private boolean canStartWriteBatch() {
950            int capacityUsed = ((writes.size() * 100) / writeBatchSize);
951            if (enabledWriteThread) {
952                // The constant 10 here controls how soon write batches start going to disk..
953                // would be nice to figure out how to auto tune that value.  Make to small and
954                // we reduce through put because we are locking the write mutex too often doing writes
955                return capacityUsed >= 10 || checkpointLatch != null;
956            } else {
957                return capacityUsed >= 80 || checkpointLatch != null;
958            }
959        }
960    
961        ///////////////////////////////////////////////////////////////////
962        // Cache Related operations
963        ///////////////////////////////////////////////////////////////////
964        @SuppressWarnings("unchecked")
965        <T> Page<T> getFromCache(long pageId) {
966            synchronized (writes) {
967                PageWrite pageWrite = writes.get(pageId);
968                if (pageWrite != null) {
969                    return pageWrite.page;
970                }
971            }
972    
973            Page<T> result = null;
974            if (enablePageCaching) {
975                result = pageCache.get(pageId);
976            }
977            return result;
978        }
979    
980        void addToCache(Page page) {
981            if (enablePageCaching) {
982                pageCache.put(page.getPageId(), page);
983            }
984        }
985    
986        void removeFromCache(long pageId) {
987            if (enablePageCaching) {
988                pageCache.remove(pageId);
989            }
990        }
991    
992        ///////////////////////////////////////////////////////////////////
993        // Internal Double write implementation follows...
994        ///////////////////////////////////////////////////////////////////
995    
996        private void pollWrites() {
997            try {
998                while (!stopWriter.get()) {
999                    // Wait for a notification...
1000                    synchronized (writes) {
1001                        writes.notifyAll();
1002    
1003                        // If there is not enough to write, wait for a notification...
1004                        while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) {
1005                            writes.wait(100);
1006                        }
1007    
1008                        if (writes.isEmpty()) {
1009                            releaseCheckpointWaiter();
1010                        }
1011                    }
1012                    writeBatch();
1013                }
1014            } catch (Throwable e) {
1015                LOG.info("An exception was raised while performing poll writes", e);
1016            } finally {
1017                releaseCheckpointWaiter();
1018            }
1019        }
1020    
1021        private void writeBatch() throws IOException {
1022    
1023            CountDownLatch checkpointLatch;
1024            ArrayList<PageWrite> batch;
1025            synchronized (writes) {
1026                // If there is not enough to write, wait for a notification...
1027    
1028                batch = new ArrayList<PageWrite>(writes.size());
1029                // build a write batch from the current write cache.
1030                for (PageWrite write : writes.values()) {
1031                    batch.add(write);
1032                    // Move the current write to the diskBound write, this lets folks update the
1033                    // page again without blocking for this write.
1034                    write.begin();
1035                    if (write.diskBound == null && write.diskBoundLocation == -1) {
1036                        batch.remove(write);
1037                    }
1038                }
1039    
1040                // Grab on to the existing checkpoint latch cause once we do this write we can
1041                // release the folks that were waiting for those writes to hit disk.
1042                checkpointLatch = this.checkpointLatch;
1043                this.checkpointLatch = null;
1044            }
1045    
1046            Checksum checksum = new Adler32();
1047            if (enableRecoveryFile) {
1048                recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1049            }
1050            for (PageWrite w : batch) {
1051                if (enableRecoveryFile) {
1052                    try {
1053                        checksum.update(w.getDiskBound(), 0, pageSize);
1054                    } catch (Throwable t) {
1055                        throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
1056                    }
1057                    recoveryFile.writeLong(w.page.getPageId());
1058                    recoveryFile.write(w.getDiskBound(), 0, pageSize);
1059                }
1060    
1061                writeFile.seek(toOffset(w.page.getPageId()));
1062                writeFile.write(w.getDiskBound(), 0, pageSize);
1063                w.done();
1064            }
1065    
1066            try {
1067                if (enableRecoveryFile) {
1068                    // Can we shrink the recovery buffer??
1069                    if (recoveryPageCount > recoveryFileMaxPageCount) {
1070                        int t = Math.max(recoveryFileMinPageCount, batch.size());
1071                        recoveryFile.setLength(recoveryFileSizeForPages(t));
1072                    }
1073    
1074                    // Record the page writes in the recovery buffer.
1075                    recoveryFile.seek(0);
1076                    // Store the next tx id...
1077                    recoveryFile.writeLong(nextTxid.get());
1078                    // Store the checksum for thw write batch so that on recovery we
1079                    // know if we have a consistent
1080                    // write batch on disk.
1081                    recoveryFile.writeLong(checksum.getValue());
1082                    // Write the # of pages that will follow
1083                    recoveryFile.writeInt(batch.size());
1084                }
1085    
1086                if (enableDiskSyncs) {
1087                    // Sync to make sure recovery buffer writes land on disk..
1088                    if (enableRecoveryFile) {
1089                        recoveryFile.getFD().sync();
1090                    }
1091                    writeFile.getFD().sync();
1092                }
1093            } finally {
1094                synchronized (writes) {
1095                    for (PageWrite w : batch) {
1096                        // If there are no more pending writes, then remove it from
1097                        // the write cache.
1098                        if (w.isDone()) {
1099                            writes.remove(w.page.getPageId());
1100                            if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
1101                                if (!w.tmpFile.delete()) {
1102                                    throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
1103                                }
1104                                tmpFilesForRemoval.remove(w.tmpFile);
1105                            }
1106                        }
1107                    }
1108                }
1109    
1110                if (checkpointLatch != null) {
1111                    checkpointLatch.countDown();
1112                }
1113            }
1114        }
1115    
1116        public void removeTmpFile(File file) {
1117            tmpFilesForRemoval.add(file);
1118        }
1119    
1120        private long recoveryFileSizeForPages(int pageCount) {
1121            return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8) * pageCount);
1122        }
1123    
1124        private void releaseCheckpointWaiter() {
1125            if (checkpointLatch != null) {
1126                checkpointLatch.countDown();
1127                checkpointLatch = null;
1128            }
1129        }
1130    
1131        /**
1132         * Inspects the recovery buffer and re-applies any
1133         * partially applied page writes.
1134         *
1135         * @return the next transaction id that can be used.
1136         */
1137        private long redoRecoveryUpdates() throws IOException {
1138            if (!enableRecoveryFile) {
1139                return 0;
1140            }
1141            recoveryPageCount = 0;
1142    
1143            // Are we initializing the recovery file?
1144            if (recoveryFile.length() == 0) {
1145                // Write an empty header..
1146                recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
1147                // Preallocate the minium size for better performance.
1148                recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
1149                return 0;
1150            }
1151    
1152            // How many recovery pages do we have in the recovery buffer?
1153            recoveryFile.seek(0);
1154            long nextTxId = recoveryFile.readLong();
1155            long expectedChecksum = recoveryFile.readLong();
1156            int pageCounter = recoveryFile.readInt();
1157    
1158            recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1159            Checksum checksum = new Adler32();
1160            LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
1161            try {
1162                for (int i = 0; i < pageCounter; i++) {
1163                    long offset = recoveryFile.readLong();
1164                    byte[] data = new byte[pageSize];
1165                    if (recoveryFile.read(data, 0, pageSize) != pageSize) {
1166                        // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
1167                        return nextTxId;
1168                    }
1169                    checksum.update(data, 0, pageSize);
1170                    batch.put(offset, data);
1171                }
1172            } catch (Exception e) {
1173                // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
1174                // as the pages should still be consistent.
1175                LOG.debug("Redo buffer was not fully intact: ", e);
1176                return nextTxId;
1177            }
1178    
1179            recoveryPageCount = pageCounter;
1180    
1181            // If the checksum is not valid then the recovery buffer was partially written to disk.
1182            if (checksum.getValue() != expectedChecksum) {
1183                return nextTxId;
1184            }
1185    
1186            // Re-apply all the writes in the recovery buffer.
1187            for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
1188                writeFile.seek(toOffset(e.getKey()));
1189                writeFile.write(e.getValue());
1190            }
1191    
1192            // And sync it to disk
1193            writeFile.getFD().sync();
1194            return nextTxId;
1195        }
1196    
1197        private void startWriter() {
1198            synchronized (writes) {
1199                if (enabledWriteThread) {
1200                    stopWriter.set(false);
1201                    writerThread = new Thread("KahaDB Page Writer") {
1202                        @Override
1203                        public void run() {
1204                            pollWrites();
1205                        }
1206                    };
1207                    writerThread.setPriority(Thread.MAX_PRIORITY);
1208                    writerThread.setDaemon(true);
1209                    writerThread.start();
1210                }
1211            }
1212        }
1213    
1214        private void stopWriter() throws InterruptedException {
1215            if (enabledWriteThread) {
1216                stopWriter.set(true);
1217                writerThread.join();
1218            }
1219        }
1220    
1221        public File getFile() {
1222            return getMainPageFile();
1223        }
1224    
1225        public File getDirectory() {
1226            return directory;
1227        }
1228    }