001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb.disk.page;
018
019 import java.io.ByteArrayInputStream;
020 import java.io.ByteArrayOutputStream;
021 import java.io.DataInputStream;
022 import java.io.DataOutputStream;
023 import java.io.File;
024 import java.io.FileInputStream;
025 import java.io.FileOutputStream;
026 import java.io.IOException;
027 import java.io.InterruptedIOException;
028 import java.io.RandomAccessFile;
029 import java.util.ArrayList;
030 import java.util.Arrays;
031 import java.util.Collection;
032 import java.util.Collections;
033 import java.util.Iterator;
034 import java.util.LinkedHashMap;
035 import java.util.Map;
036 import java.util.Map.Entry;
037 import java.util.Properties;
038 import java.util.TreeMap;
039 import java.util.concurrent.CountDownLatch;
040 import java.util.concurrent.atomic.AtomicBoolean;
041 import java.util.concurrent.atomic.AtomicLong;
042 import java.util.zip.Adler32;
043 import java.util.zip.Checksum;
044
045 import org.apache.activemq.util.DataByteArrayOutputStream;
046 import org.apache.activemq.util.IOExceptionSupport;
047 import org.apache.activemq.util.IOHelper;
048 import org.apache.activemq.util.IntrospectionSupport;
049 import org.apache.activemq.util.LFUCache;
050 import org.apache.activemq.util.LRUCache;
051 import org.apache.activemq.store.kahadb.disk.util.Sequence;
052 import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
053 import org.slf4j.Logger;
054 import org.slf4j.LoggerFactory;
055
056 /**
057 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should
058 * be externally synchronized.
059 * <p/>
060 * The file has 3 parts:
061 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
062 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
063 * Page Space: The pages in the page file.
064 */
065 public class PageFile {
066
067 private static final String PAGEFILE_SUFFIX = ".data";
068 private static final String RECOVERY_FILE_SUFFIX = ".redo";
069 private static final String FREE_FILE_SUFFIX = ".free";
070
071 // 4k Default page size.
072 public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4);
073 public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000);
074 public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);;
075
076 private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
077 private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
078
079 // Recovery header is (long offset)
080 private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
081
082 // A PageFile will use a couple of files in this directory
083 private File directory;
084 // And the file names in that directory will be based on this name.
085 private final String name;
086
087 // File handle used for reading pages..
088 private RandomAccessFile readFile;
089 // File handle used for writing pages..
090 private RandomAccessFile writeFile;
091 // File handle used for writing pages..
092 private RandomAccessFile recoveryFile;
093
094 // The size of pages
095 private int pageSize = DEFAULT_PAGE_SIZE;
096
097 // The minimum number of space allocated to the recovery file in number of pages.
098 private int recoveryFileMinPageCount = 1000;
099 // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
100 // to this max size as soon as possible.
101 private int recoveryFileMaxPageCount = 10000;
102 // The number of pages in the current recovery buffer
103 private int recoveryPageCount;
104
105 private AtomicBoolean loaded = new AtomicBoolean();
106 // The number of pages we are aiming to write every time we
107 // write to disk.
108 int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
109
110 // We keep a cache of pages recently used?
111 private Map<Long, Page> pageCache;
112 // The cache of recently used pages.
113 private boolean enablePageCaching = true;
114 // How many pages will we keep in the cache?
115 private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
116
117 // Should first log the page write to the recovery buffer? Avoids partial
118 // page write failures..
119 private boolean enableRecoveryFile = true;
120 // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
121 private boolean enableDiskSyncs = true;
122 // Will writes be done in an async thread?
123 private boolean enabledWriteThread = false;
124
125 // These are used if enableAsyncWrites==true
126 private AtomicBoolean stopWriter = new AtomicBoolean();
127 private Thread writerThread;
128 private CountDownLatch checkpointLatch;
129
130 // Keeps track of writes that are being written to disk.
131 private TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
132
133 // Keeps track of free pages.
134 private final AtomicLong nextFreePageId = new AtomicLong();
135 private SequenceSet freeList = new SequenceSet();
136
137 private AtomicLong nextTxid = new AtomicLong();
138
139 // Persistent settings stored in the page file.
140 private MetaData metaData;
141
142 private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
143
144 private boolean useLFRUEviction = false;
145 private float LFUEvictionFactor = 0.2f;
146
147 /**
148 * Use to keep track of updated pages which have not yet been committed.
149 */
150 static class PageWrite {
151 Page page;
152 byte[] current;
153 byte[] diskBound;
154 long currentLocation = -1;
155 long diskBoundLocation = -1;
156 File tmpFile;
157 int length;
158
159 public PageWrite(Page page, byte[] data) {
160 this.page = page;
161 current = data;
162 }
163
164 public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
165 this.page = page;
166 this.currentLocation = currentLocation;
167 this.tmpFile = tmpFile;
168 this.length = length;
169 }
170
171 public void setCurrent(Page page, byte[] data) {
172 this.page = page;
173 current = data;
174 currentLocation = -1;
175 diskBoundLocation = -1;
176 }
177
178 public void setCurrentLocation(Page page, long location, int length) {
179 this.page = page;
180 this.currentLocation = location;
181 this.length = length;
182 this.current = null;
183 }
184
185 @Override
186 public String toString() {
187 return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]";
188 }
189
190 @SuppressWarnings("unchecked")
191 public Page getPage() {
192 return page;
193 }
194
195 public byte[] getDiskBound() throws IOException {
196 if (diskBound == null && diskBoundLocation != -1) {
197 diskBound = new byte[length];
198 RandomAccessFile file = new RandomAccessFile(tmpFile, "r");
199 file.seek(diskBoundLocation);
200 file.read(diskBound);
201 file.close();
202 diskBoundLocation = -1;
203 }
204 return diskBound;
205 }
206
207 void begin() {
208 if (currentLocation != -1) {
209 diskBoundLocation = currentLocation;
210 } else {
211 diskBound = current;
212 }
213 current = null;
214 currentLocation = -1;
215 }
216
217 /**
218 * @return true if there is no pending writes to do.
219 */
220 boolean done() {
221 diskBoundLocation = -1;
222 diskBound = null;
223 return current == null || currentLocation == -1;
224 }
225
226 boolean isDone() {
227 return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
228 }
229 }
230
231 /**
232 * The MetaData object hold the persistent data associated with a PageFile object.
233 */
234 public static class MetaData {
235
236 String fileType;
237 String fileTypeVersion;
238
239 long metaDataTxId = -1;
240 int pageSize;
241 boolean cleanShutdown;
242 long lastTxId;
243 long freePages;
244
245 public String getFileType() {
246 return fileType;
247 }
248
249 public void setFileType(String fileType) {
250 this.fileType = fileType;
251 }
252
253 public String getFileTypeVersion() {
254 return fileTypeVersion;
255 }
256
257 public void setFileTypeVersion(String version) {
258 this.fileTypeVersion = version;
259 }
260
261 public long getMetaDataTxId() {
262 return metaDataTxId;
263 }
264
265 public void setMetaDataTxId(long metaDataTxId) {
266 this.metaDataTxId = metaDataTxId;
267 }
268
269 public int getPageSize() {
270 return pageSize;
271 }
272
273 public void setPageSize(int pageSize) {
274 this.pageSize = pageSize;
275 }
276
277 public boolean isCleanShutdown() {
278 return cleanShutdown;
279 }
280
281 public void setCleanShutdown(boolean cleanShutdown) {
282 this.cleanShutdown = cleanShutdown;
283 }
284
285 public long getLastTxId() {
286 return lastTxId;
287 }
288
289 public void setLastTxId(long lastTxId) {
290 this.lastTxId = lastTxId;
291 }
292
293 public long getFreePages() {
294 return freePages;
295 }
296
297 public void setFreePages(long value) {
298 this.freePages = value;
299 }
300 }
301
302 public Transaction tx() {
303 assertLoaded();
304 return new Transaction(this);
305 }
306
307 /**
308 * Creates a PageFile in the specified directory who's data files are named by name.
309 */
310 public PageFile(File directory, String name) {
311 this.directory = directory;
312 this.name = name;
313 }
314
315 /**
316 * Deletes the files used by the PageFile object. This method can only be used when this object is not loaded.
317 *
318 * @throws IOException if the files cannot be deleted.
319 * @throws IllegalStateException if this PageFile is loaded
320 */
321 public void delete() throws IOException {
322 if (loaded.get()) {
323 throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
324 }
325 delete(getMainPageFile());
326 delete(getFreeFile());
327 delete(getRecoveryFile());
328 }
329
330 public void archive() throws IOException {
331 if (loaded.get()) {
332 throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
333 }
334 long timestamp = System.currentTimeMillis();
335 archive(getMainPageFile(), String.valueOf(timestamp));
336 archive(getFreeFile(), String.valueOf(timestamp));
337 archive(getRecoveryFile(), String.valueOf(timestamp));
338 }
339
340 /**
341 * @param file
342 * @throws IOException
343 */
344 private void delete(File file) throws IOException {
345 if (file.exists() && !file.delete()) {
346 throw new IOException("Could not delete: " + file.getPath());
347 }
348 }
349
350 private void archive(File file, String suffix) throws IOException {
351 if (file.exists()) {
352 File archive = new File(file.getPath() + "-" + suffix);
353 if (!file.renameTo(archive)) {
354 throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath());
355 }
356 }
357 }
358
359 /**
360 * Loads the page file so that it can be accessed for read/write purposes. This allocates OS resources. If this is the
361 * first time the page file is loaded, then this creates the page file in the file system.
362 *
363 * @throws IOException If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if
364 * there was a disk error.
365 * @throws IllegalStateException If the page file was already loaded.
366 */
367 public void load() throws IOException, IllegalStateException {
368 if (loaded.compareAndSet(false, true)) {
369
370 if (enablePageCaching) {
371 if (isUseLFRUEviction()) {
372 pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor()));
373 } else {
374 pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
375 }
376 }
377
378 File file = getMainPageFile();
379 IOHelper.mkdirs(file.getParentFile());
380 writeFile = new RandomAccessFile(file, "rw");
381 readFile = new RandomAccessFile(file, "r");
382
383 if (readFile.length() > 0) {
384 // Load the page size setting cause that can't change once the file is created.
385 loadMetaData();
386 pageSize = metaData.getPageSize();
387 } else {
388 // Store the page size setting cause that can't change once the file is created.
389 metaData = new MetaData();
390 metaData.setFileType(PageFile.class.getName());
391 metaData.setFileTypeVersion("1");
392 metaData.setPageSize(getPageSize());
393 metaData.setCleanShutdown(true);
394 metaData.setFreePages(-1);
395 metaData.setLastTxId(0);
396 storeMetaData();
397 }
398
399 if (enableRecoveryFile) {
400 recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw");
401 }
402
403 if (metaData.isCleanShutdown()) {
404 nextTxid.set(metaData.getLastTxId() + 1);
405 if (metaData.getFreePages() > 0) {
406 loadFreeList();
407 }
408 } else {
409 LOG.debug(toString() + ", Recovering page file...");
410 nextTxid.set(redoRecoveryUpdates());
411
412 // Scan all to find the free pages.
413 freeList = new SequenceSet();
414 for (Iterator<Page> i = tx().iterator(true); i.hasNext(); ) {
415 Page page = i.next();
416 if (page.getType() == Page.PAGE_FREE_TYPE) {
417 freeList.add(page.getPageId());
418 }
419 }
420 }
421
422 metaData.setCleanShutdown(false);
423 storeMetaData();
424 getFreeFile().delete();
425
426 if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
427 writeFile.setLength(PAGE_FILE_HEADER_SIZE);
428 }
429 nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize);
430 startWriter();
431
432 } else {
433 throw new IllegalStateException("Cannot load the page file when it is already loaded.");
434 }
435 }
436
437
438 /**
439 * Unloads a previously loaded PageFile. This deallocates OS related resources like file handles.
440 * once unloaded, you can no longer use the page file to read or write Pages.
441 *
442 * @throws IOException if there was a disk error occurred while closing the down the page file.
443 * @throws IllegalStateException if the PageFile is not loaded
444 */
445 public void unload() throws IOException {
446 if (loaded.compareAndSet(true, false)) {
447 flush();
448 try {
449 stopWriter();
450 } catch (InterruptedException e) {
451 throw new InterruptedIOException();
452 }
453
454 if (freeList.isEmpty()) {
455 metaData.setFreePages(0);
456 } else {
457 storeFreeList();
458 metaData.setFreePages(freeList.size());
459 }
460
461 metaData.setLastTxId(nextTxid.get() - 1);
462 metaData.setCleanShutdown(true);
463 storeMetaData();
464
465 if (readFile != null) {
466 readFile.close();
467 readFile = null;
468 writeFile.close();
469 writeFile = null;
470 if (enableRecoveryFile) {
471 recoveryFile.close();
472 recoveryFile = null;
473 }
474 freeList.clear();
475 if (pageCache != null) {
476 pageCache = null;
477 }
478 synchronized (writes) {
479 writes.clear();
480 }
481 }
482 } else {
483 throw new IllegalStateException("Cannot unload the page file when it is not loaded");
484 }
485 }
486
487 public boolean isLoaded() {
488 return loaded.get();
489 }
490
491 /**
492 * Flush and sync all write buffers to disk.
493 *
494 * @throws IOException If an disk error occurred.
495 */
496 public void flush() throws IOException {
497
498 if (enabledWriteThread && stopWriter.get()) {
499 throw new IOException("Page file already stopped: checkpointing is not allowed");
500 }
501
502 // Setup a latch that gets notified when all buffered writes hits the disk.
503 CountDownLatch checkpointLatch;
504 synchronized (writes) {
505 if (writes.isEmpty()) {
506 return;
507 }
508 if (enabledWriteThread) {
509 if (this.checkpointLatch == null) {
510 this.checkpointLatch = new CountDownLatch(1);
511 }
512 checkpointLatch = this.checkpointLatch;
513 writes.notify();
514 } else {
515 writeBatch();
516 return;
517 }
518 }
519 try {
520 checkpointLatch.await();
521 } catch (InterruptedException e) {
522 InterruptedIOException ioe = new InterruptedIOException();
523 ioe.initCause(e);
524 throw ioe;
525 }
526 }
527
528
529 public String toString() {
530 return "Page File: " + getMainPageFile();
531 }
532
533 ///////////////////////////////////////////////////////////////////
534 // Private Implementation Methods
535 ///////////////////////////////////////////////////////////////////
536 private File getMainPageFile() {
537 return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX);
538 }
539
540 public File getFreeFile() {
541 return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX);
542 }
543
544 public File getRecoveryFile() {
545 return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX);
546 }
547
548 public long toOffset(long pageId) {
549 return PAGE_FILE_HEADER_SIZE + (pageId * pageSize);
550 }
551
552 private void loadMetaData() throws IOException {
553
554 ByteArrayInputStream is;
555 MetaData v1 = new MetaData();
556 MetaData v2 = new MetaData();
557 try {
558 Properties p = new Properties();
559 byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
560 readFile.seek(0);
561 readFile.readFully(d);
562 is = new ByteArrayInputStream(d);
563 p.load(is);
564 IntrospectionSupport.setProperties(v1, p);
565 } catch (IOException e) {
566 v1 = null;
567 }
568
569 try {
570 Properties p = new Properties();
571 byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
572 readFile.seek(PAGE_FILE_HEADER_SIZE / 2);
573 readFile.readFully(d);
574 is = new ByteArrayInputStream(d);
575 p.load(is);
576 IntrospectionSupport.setProperties(v2, p);
577 } catch (IOException e) {
578 v2 = null;
579 }
580
581 if (v1 == null && v2 == null) {
582 throw new IOException("Could not load page file meta data");
583 }
584
585 if (v1 == null || v1.metaDataTxId < 0) {
586 metaData = v2;
587 } else if (v2 == null || v1.metaDataTxId < 0) {
588 metaData = v1;
589 } else if (v1.metaDataTxId == v2.metaDataTxId) {
590 metaData = v1; // use the first since the 2nd could be a partial..
591 } else {
592 metaData = v2; // use the second cause the first is probably a partial.
593 }
594 }
595
596 private void storeMetaData() throws IOException {
597 // Convert the metadata into a property format
598 metaData.metaDataTxId++;
599 Properties p = new Properties();
600 IntrospectionSupport.getProperties(metaData, p, null);
601
602 ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
603 p.store(os, "");
604 if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
605 throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2);
606 }
607 // Fill the rest with space...
608 byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
609 Arrays.fill(filler, (byte) ' ');
610 os.write(filler);
611 os.flush();
612
613 byte[] d = os.toByteArray();
614
615 // So we don't loose it.. write it 2 times...
616 writeFile.seek(0);
617 writeFile.write(d);
618 writeFile.getFD().sync();
619 writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
620 writeFile.write(d);
621 writeFile.getFD().sync();
622 }
623
624 private void storeFreeList() throws IOException {
625 FileOutputStream os = new FileOutputStream(getFreeFile());
626 DataOutputStream dos = new DataOutputStream(os);
627 SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
628 dos.close();
629 }
630
631 private void loadFreeList() throws IOException {
632 freeList.clear();
633 FileInputStream is = new FileInputStream(getFreeFile());
634 DataInputStream dis = new DataInputStream(is);
635 freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
636 dis.close();
637 }
638
639 ///////////////////////////////////////////////////////////////////
640 // Property Accessors
641 ///////////////////////////////////////////////////////////////////
642
643 /**
644 * Is the recovery buffer used to double buffer page writes. Enabled by default.
645 *
646 * @return is the recovery buffer enabled.
647 */
648 public boolean isEnableRecoveryFile() {
649 return enableRecoveryFile;
650 }
651
652 /**
653 * Sets if the recovery buffer uses to double buffer page writes. Enabled by default. Disabling this
654 * may potentially cause partial page writes which can lead to page file corruption.
655 */
656 public void setEnableRecoveryFile(boolean doubleBuffer) {
657 assertNotLoaded();
658 this.enableRecoveryFile = doubleBuffer;
659 }
660
661 /**
662 * @return Are page writes synced to disk?
663 */
664 public boolean isEnableDiskSyncs() {
665 return enableDiskSyncs;
666 }
667
668 /**
669 * Allows you enable syncing writes to disk.
670 */
671 public void setEnableDiskSyncs(boolean syncWrites) {
672 assertNotLoaded();
673 this.enableDiskSyncs = syncWrites;
674 }
675
676 /**
677 * @return the page size
678 */
679 public int getPageSize() {
680 return this.pageSize;
681 }
682
683 /**
684 * @return the amount of content data that a page can hold.
685 */
686 public int getPageContentSize() {
687 return this.pageSize - Page.PAGE_HEADER_SIZE;
688 }
689
690 /**
691 * Configures the page size used by the page file. By default it is 4k. Once a page file is created on disk,
692 * subsequent loads of that file will use the original pageSize. Once the PageFile is loaded, this setting
693 * can no longer be changed.
694 *
695 * @param pageSize the pageSize to set
696 * @throws IllegalStateException once the page file is loaded.
697 */
698 public void setPageSize(int pageSize) throws IllegalStateException {
699 assertNotLoaded();
700 this.pageSize = pageSize;
701 }
702
703 /**
704 * @return true if read page caching is enabled
705 */
706 public boolean isEnablePageCaching() {
707 return this.enablePageCaching;
708 }
709
710 /**
711 * @param enablePageCaching allows you to enable read page caching
712 */
713 public void setEnablePageCaching(boolean enablePageCaching) {
714 assertNotLoaded();
715 this.enablePageCaching = enablePageCaching;
716 }
717
718 /**
719 * @return the maximum number of pages that will get stored in the read page cache.
720 */
721 public int getPageCacheSize() {
722 return this.pageCacheSize;
723 }
724
725 /**
726 * @param pageCacheSize Sets the maximum number of pages that will get stored in the read page cache.
727 */
728 public void setPageCacheSize(int pageCacheSize) {
729 assertNotLoaded();
730 this.pageCacheSize = pageCacheSize;
731 }
732
733 public boolean isEnabledWriteThread() {
734 return enabledWriteThread;
735 }
736
737 public void setEnableWriteThread(boolean enableAsyncWrites) {
738 assertNotLoaded();
739 this.enabledWriteThread = enableAsyncWrites;
740 }
741
742 public long getDiskSize() throws IOException {
743 return toOffset(nextFreePageId.get());
744 }
745
746 /**
747 * @return the number of pages allocated in the PageFile
748 */
749 public long getPageCount() {
750 return nextFreePageId.get();
751 }
752
753 public int getRecoveryFileMinPageCount() {
754 return recoveryFileMinPageCount;
755 }
756
757 public long getFreePageCount() {
758 assertLoaded();
759 return freeList.rangeSize();
760 }
761
762 public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
763 assertNotLoaded();
764 this.recoveryFileMinPageCount = recoveryFileMinPageCount;
765 }
766
767 public int getRecoveryFileMaxPageCount() {
768 return recoveryFileMaxPageCount;
769 }
770
771 public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) {
772 assertNotLoaded();
773 this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
774 }
775
776 public int getWriteBatchSize() {
777 return writeBatchSize;
778 }
779
780 public void setWriteBatchSize(int writeBatchSize) {
781 this.writeBatchSize = writeBatchSize;
782 }
783
784 public float getLFUEvictionFactor() {
785 return LFUEvictionFactor;
786 }
787
788 public void setLFUEvictionFactor(float LFUEvictionFactor) {
789 this.LFUEvictionFactor = LFUEvictionFactor;
790 }
791
792 public boolean isUseLFRUEviction() {
793 return useLFRUEviction;
794 }
795
796 public void setUseLFRUEviction(boolean useLFRUEviction) {
797 this.useLFRUEviction = useLFRUEviction;
798 }
799
800 ///////////////////////////////////////////////////////////////////
801 // Package Protected Methods exposed to Transaction
802 ///////////////////////////////////////////////////////////////////
803
804 /**
805 * @throws IllegalStateException if the page file is not loaded.
806 */
807 void assertLoaded() throws IllegalStateException {
808 if (!loaded.get()) {
809 throw new IllegalStateException("PageFile is not loaded");
810 }
811 }
812
813 void assertNotLoaded() throws IllegalStateException {
814 if (loaded.get()) {
815 throw new IllegalStateException("PageFile is loaded");
816 }
817 }
818
819 /**
820 * Allocates a block of free pages that you can write data to.
821 *
822 * @param count the number of sequential pages to allocate
823 * @return the first page of the sequential set.
824 * @throws IOException If an disk error occurred.
825 * @throws IllegalStateException if the PageFile is not loaded
826 */
827 <T> Page<T> allocate(int count) throws IOException {
828 assertLoaded();
829 if (count <= 0) {
830 throw new IllegalArgumentException("The allocation count must be larger than zero");
831 }
832
833 Sequence seq = freeList.removeFirstSequence(count);
834
835 // We may need to create new free pages...
836 if (seq == null) {
837
838 Page<T> first = null;
839 int c = count;
840
841 // Perform the id's only once....
842 long pageId = nextFreePageId.getAndAdd(count);
843 long writeTxnId = nextTxid.getAndAdd(count);
844
845 while (c-- > 0) {
846 Page<T> page = new Page<T>(pageId++);
847 page.makeFree(writeTxnId++);
848
849 if (first == null) {
850 first = page;
851 }
852
853 addToCache(page);
854 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
855 page.write(out);
856 write(page, out.getData());
857
858 // LOG.debug("allocate writing: "+page.getPageId());
859 }
860
861 return first;
862 }
863
864 Page<T> page = new Page<T>(seq.getFirst());
865 page.makeFree(0);
866 // LOG.debug("allocated: "+page.getPageId());
867 return page;
868 }
869
870 long getNextWriteTransactionId() {
871 return nextTxid.incrementAndGet();
872 }
873
874 synchronized void readPage(long pageId, byte[] data) throws IOException {
875 readFile.seek(toOffset(pageId));
876 readFile.readFully(data);
877 }
878
879 public void freePage(long pageId) {
880 freeList.add(pageId);
881 removeFromCache(pageId);
882 }
883
884 @SuppressWarnings("unchecked")
885 private <T> void write(Page<T> page, byte[] data) throws IOException {
886 final PageWrite write = new PageWrite(page, data);
887 Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
888 public Long getKey() {
889 return write.getPage().getPageId();
890 }
891
892 public PageWrite getValue() {
893 return write;
894 }
895
896 public PageWrite setValue(PageWrite value) {
897 return null;
898 }
899 };
900 Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry};
901 write(Arrays.asList(entries));
902 }
903
904 void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
905 synchronized (writes) {
906 if (enabledWriteThread) {
907 while (writes.size() >= writeBatchSize && !stopWriter.get()) {
908 try {
909 writes.wait();
910 } catch (InterruptedException e) {
911 Thread.currentThread().interrupt();
912 throw new InterruptedIOException();
913 }
914 }
915 }
916
917 boolean longTx = false;
918
919 for (Map.Entry<Long, PageWrite> entry : updates) {
920 Long key = entry.getKey();
921 PageWrite value = entry.getValue();
922 PageWrite write = writes.get(key);
923 if (write == null) {
924 writes.put(key, value);
925 } else {
926 if (value.currentLocation != -1) {
927 write.setCurrentLocation(value.page, value.currentLocation, value.length);
928 write.tmpFile = value.tmpFile;
929 longTx = true;
930 } else {
931 write.setCurrent(value.page, value.current);
932 }
933 }
934 }
935
936 // Once we start approaching capacity, notify the writer to start writing
937 // sync immediately for long txs
938 if (longTx || canStartWriteBatch()) {
939
940 if (enabledWriteThread) {
941 writes.notify();
942 } else {
943 writeBatch();
944 }
945 }
946 }
947 }
948
949 private boolean canStartWriteBatch() {
950 int capacityUsed = ((writes.size() * 100) / writeBatchSize);
951 if (enabledWriteThread) {
952 // The constant 10 here controls how soon write batches start going to disk..
953 // would be nice to figure out how to auto tune that value. Make to small and
954 // we reduce through put because we are locking the write mutex too often doing writes
955 return capacityUsed >= 10 || checkpointLatch != null;
956 } else {
957 return capacityUsed >= 80 || checkpointLatch != null;
958 }
959 }
960
961 ///////////////////////////////////////////////////////////////////
962 // Cache Related operations
963 ///////////////////////////////////////////////////////////////////
964 @SuppressWarnings("unchecked")
965 <T> Page<T> getFromCache(long pageId) {
966 synchronized (writes) {
967 PageWrite pageWrite = writes.get(pageId);
968 if (pageWrite != null) {
969 return pageWrite.page;
970 }
971 }
972
973 Page<T> result = null;
974 if (enablePageCaching) {
975 result = pageCache.get(pageId);
976 }
977 return result;
978 }
979
980 void addToCache(Page page) {
981 if (enablePageCaching) {
982 pageCache.put(page.getPageId(), page);
983 }
984 }
985
986 void removeFromCache(long pageId) {
987 if (enablePageCaching) {
988 pageCache.remove(pageId);
989 }
990 }
991
992 ///////////////////////////////////////////////////////////////////
993 // Internal Double write implementation follows...
994 ///////////////////////////////////////////////////////////////////
995
996 private void pollWrites() {
997 try {
998 while (!stopWriter.get()) {
999 // Wait for a notification...
1000 synchronized (writes) {
1001 writes.notifyAll();
1002
1003 // If there is not enough to write, wait for a notification...
1004 while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) {
1005 writes.wait(100);
1006 }
1007
1008 if (writes.isEmpty()) {
1009 releaseCheckpointWaiter();
1010 }
1011 }
1012 writeBatch();
1013 }
1014 } catch (Throwable e) {
1015 LOG.info("An exception was raised while performing poll writes", e);
1016 } finally {
1017 releaseCheckpointWaiter();
1018 }
1019 }
1020
1021 private void writeBatch() throws IOException {
1022
1023 CountDownLatch checkpointLatch;
1024 ArrayList<PageWrite> batch;
1025 synchronized (writes) {
1026 // If there is not enough to write, wait for a notification...
1027
1028 batch = new ArrayList<PageWrite>(writes.size());
1029 // build a write batch from the current write cache.
1030 for (PageWrite write : writes.values()) {
1031 batch.add(write);
1032 // Move the current write to the diskBound write, this lets folks update the
1033 // page again without blocking for this write.
1034 write.begin();
1035 if (write.diskBound == null && write.diskBoundLocation == -1) {
1036 batch.remove(write);
1037 }
1038 }
1039
1040 // Grab on to the existing checkpoint latch cause once we do this write we can
1041 // release the folks that were waiting for those writes to hit disk.
1042 checkpointLatch = this.checkpointLatch;
1043 this.checkpointLatch = null;
1044 }
1045
1046 Checksum checksum = new Adler32();
1047 if (enableRecoveryFile) {
1048 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1049 }
1050 for (PageWrite w : batch) {
1051 if (enableRecoveryFile) {
1052 try {
1053 checksum.update(w.getDiskBound(), 0, pageSize);
1054 } catch (Throwable t) {
1055 throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
1056 }
1057 recoveryFile.writeLong(w.page.getPageId());
1058 recoveryFile.write(w.getDiskBound(), 0, pageSize);
1059 }
1060
1061 writeFile.seek(toOffset(w.page.getPageId()));
1062 writeFile.write(w.getDiskBound(), 0, pageSize);
1063 w.done();
1064 }
1065
1066 try {
1067 if (enableRecoveryFile) {
1068 // Can we shrink the recovery buffer??
1069 if (recoveryPageCount > recoveryFileMaxPageCount) {
1070 int t = Math.max(recoveryFileMinPageCount, batch.size());
1071 recoveryFile.setLength(recoveryFileSizeForPages(t));
1072 }
1073
1074 // Record the page writes in the recovery buffer.
1075 recoveryFile.seek(0);
1076 // Store the next tx id...
1077 recoveryFile.writeLong(nextTxid.get());
1078 // Store the checksum for thw write batch so that on recovery we
1079 // know if we have a consistent
1080 // write batch on disk.
1081 recoveryFile.writeLong(checksum.getValue());
1082 // Write the # of pages that will follow
1083 recoveryFile.writeInt(batch.size());
1084 }
1085
1086 if (enableDiskSyncs) {
1087 // Sync to make sure recovery buffer writes land on disk..
1088 if (enableRecoveryFile) {
1089 recoveryFile.getFD().sync();
1090 }
1091 writeFile.getFD().sync();
1092 }
1093 } finally {
1094 synchronized (writes) {
1095 for (PageWrite w : batch) {
1096 // If there are no more pending writes, then remove it from
1097 // the write cache.
1098 if (w.isDone()) {
1099 writes.remove(w.page.getPageId());
1100 if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
1101 if (!w.tmpFile.delete()) {
1102 throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
1103 }
1104 tmpFilesForRemoval.remove(w.tmpFile);
1105 }
1106 }
1107 }
1108 }
1109
1110 if (checkpointLatch != null) {
1111 checkpointLatch.countDown();
1112 }
1113 }
1114 }
1115
1116 public void removeTmpFile(File file) {
1117 tmpFilesForRemoval.add(file);
1118 }
1119
1120 private long recoveryFileSizeForPages(int pageCount) {
1121 return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8) * pageCount);
1122 }
1123
1124 private void releaseCheckpointWaiter() {
1125 if (checkpointLatch != null) {
1126 checkpointLatch.countDown();
1127 checkpointLatch = null;
1128 }
1129 }
1130
1131 /**
1132 * Inspects the recovery buffer and re-applies any
1133 * partially applied page writes.
1134 *
1135 * @return the next transaction id that can be used.
1136 */
1137 private long redoRecoveryUpdates() throws IOException {
1138 if (!enableRecoveryFile) {
1139 return 0;
1140 }
1141 recoveryPageCount = 0;
1142
1143 // Are we initializing the recovery file?
1144 if (recoveryFile.length() == 0) {
1145 // Write an empty header..
1146 recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
1147 // Preallocate the minium size for better performance.
1148 recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
1149 return 0;
1150 }
1151
1152 // How many recovery pages do we have in the recovery buffer?
1153 recoveryFile.seek(0);
1154 long nextTxId = recoveryFile.readLong();
1155 long expectedChecksum = recoveryFile.readLong();
1156 int pageCounter = recoveryFile.readInt();
1157
1158 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1159 Checksum checksum = new Adler32();
1160 LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
1161 try {
1162 for (int i = 0; i < pageCounter; i++) {
1163 long offset = recoveryFile.readLong();
1164 byte[] data = new byte[pageSize];
1165 if (recoveryFile.read(data, 0, pageSize) != pageSize) {
1166 // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
1167 return nextTxId;
1168 }
1169 checksum.update(data, 0, pageSize);
1170 batch.put(offset, data);
1171 }
1172 } catch (Exception e) {
1173 // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
1174 // as the pages should still be consistent.
1175 LOG.debug("Redo buffer was not fully intact: ", e);
1176 return nextTxId;
1177 }
1178
1179 recoveryPageCount = pageCounter;
1180
1181 // If the checksum is not valid then the recovery buffer was partially written to disk.
1182 if (checksum.getValue() != expectedChecksum) {
1183 return nextTxId;
1184 }
1185
1186 // Re-apply all the writes in the recovery buffer.
1187 for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
1188 writeFile.seek(toOffset(e.getKey()));
1189 writeFile.write(e.getValue());
1190 }
1191
1192 // And sync it to disk
1193 writeFile.getFD().sync();
1194 return nextTxId;
1195 }
1196
1197 private void startWriter() {
1198 synchronized (writes) {
1199 if (enabledWriteThread) {
1200 stopWriter.set(false);
1201 writerThread = new Thread("KahaDB Page Writer") {
1202 @Override
1203 public void run() {
1204 pollWrites();
1205 }
1206 };
1207 writerThread.setPriority(Thread.MAX_PRIORITY);
1208 writerThread.setDaemon(true);
1209 writerThread.start();
1210 }
1211 }
1212 }
1213
1214 private void stopWriter() throws InterruptedException {
1215 if (enabledWriteThread) {
1216 stopWriter.set(true);
1217 writerThread.join();
1218 }
1219 }
1220
1221 public File getFile() {
1222 return getMainPageFile();
1223 }
1224
1225 public File getDirectory() {
1226 return directory;
1227 }
1228 }