001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb.disk.page;
018
019 import org.apache.activemq.store.kahadb.disk.page.PageFile.PageWrite;
020 import org.apache.activemq.store.kahadb.disk.util.*;
021 import org.apache.activemq.util.ByteSequence;
022 import org.apache.activemq.store.kahadb.disk.util.DataByteArrayInputStream;
023 import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
024 import org.apache.activemq.util.IOHelper;
025
026 import java.io.*;
027 import java.util.Iterator;
028 import java.util.NoSuchElementException;
029 import java.util.TreeMap;
030
031 /**
032 * The class used to read/update a PageFile object. Using a transaction allows you to
033 * do multiple update operations in a single unit of work.
034 */
035 public class Transaction implements Iterable<Page> {
036
037 private RandomAccessFile tmpFile;
038 private File txFile;
039 private long nextLocation = 0;
040
041 /**
042 * The PageOverflowIOException occurs when a page write is requested
043 * and it's data is larger than what would fit into a single page.
044 */
045 public class PageOverflowIOException extends IOException {
046 private static final long serialVersionUID = 1L;
047
048 public PageOverflowIOException(String message) {
049 super(message);
050 }
051 }
052
053 /**
054 * The InvalidPageIOException is thrown if try to load/store a a page
055 * with an invalid page id.
056 */
057 public class InvalidPageIOException extends IOException {
058 private static final long serialVersionUID = 1L;
059
060 private final long page;
061
062 public InvalidPageIOException(String message, long page) {
063 super(message);
064 this.page = page;
065 }
066
067 public long getPage() {
068 return page;
069 }
070 }
071
072 /**
073 * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
074 *
075 * @param <T> The type of exceptions that operation will throw.
076 */
077 public interface Closure <T extends Throwable> {
078 public void execute(Transaction tx) throws T;
079 }
080
081 /**
082 * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
083 *
084 * @param <R> The type of result that the closure produces.
085 * @param <T> The type of exceptions that operation will throw.
086 */
087 public interface CallableClosure<R, T extends Throwable> {
088 public R execute(Transaction tx) throws T;
089 }
090
091
092 // The page file that this Transaction operates against.
093 private final PageFile pageFile;
094 // If this transaction is updating stuff.. this is the tx of
095 private long writeTransactionId=-1;
096 // List of pages that this transaction has modified.
097 private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
098 // List of pages allocated in this transaction
099 private final SequenceSet allocateList = new SequenceSet();
100 // List of pages freed in this transaction
101 private final SequenceSet freeList = new SequenceSet();
102
103 private long maxTransactionSize = Long.getLong("maxKahaDBTxSize", 10485760L);
104
105 private long size = 0;
106
107 Transaction(PageFile pageFile) {
108 this.pageFile = pageFile;
109 }
110
111 /**
112 * @return the page file that created this Transaction
113 */
114 public PageFile getPageFile() {
115 return this.pageFile;
116 }
117
118 /**
119 * Allocates a free page that you can write data to.
120 *
121 * @return a newly allocated page.
122 * @throws IOException
123 * If an disk error occurred.
124 * @throws IllegalStateException
125 * if the PageFile is not loaded
126 */
127 public <T> Page<T> allocate() throws IOException {
128 return allocate(1);
129 }
130
131 /**
132 * Allocates a block of free pages that you can write data to.
133 *
134 * @param count the number of sequential pages to allocate
135 * @return the first page of the sequential set.
136 * @throws IOException
137 * If an disk error occurred.
138 * @throws IllegalStateException
139 * if the PageFile is not loaded
140 */
141 public <T> Page<T> allocate(int count) throws IOException {
142 Page<T> rc = pageFile.allocate(count);
143 allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1));
144 return rc;
145 }
146
147 /**
148 * Frees up a previously allocated page so that it can be re-allocated again.
149 *
150 * @param pageId the page to free up
151 * @throws IOException
152 * If an disk error occurred.
153 * @throws IllegalStateException
154 * if the PageFile is not loaded
155 */
156 public void free(long pageId) throws IOException {
157 free(load(pageId, null));
158 }
159
160 /**
161 * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
162 *
163 * @param pageId the initial page of the sequence that will be getting freed
164 * @param count the number of pages in the sequence
165 *
166 * @throws IOException
167 * If an disk error occurred.
168 * @throws IllegalStateException
169 * if the PageFile is not loaded
170 */
171 public void free(long pageId, int count) throws IOException {
172 free(load(pageId, null), count);
173 }
174
175 /**
176 * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
177 *
178 * @param page the initial page of the sequence that will be getting freed
179 * @param count the number of pages in the sequence
180 *
181 * @throws IOException
182 * If an disk error occurred.
183 * @throws IllegalStateException
184 * if the PageFile is not loaded
185 */
186 public <T> void free(Page<T> page, int count) throws IOException {
187 pageFile.assertLoaded();
188 long offsetPage = page.getPageId();
189 while (count-- > 0) {
190 if (page == null) {
191 page = load(offsetPage, null);
192 }
193 free(page);
194 page = null;
195 // Increment the offsetPage value since using it depends on the current count.
196 offsetPage++;
197 }
198 }
199
200 /**
201 * Frees up a previously allocated page so that it can be re-allocated again.
202 *
203 * @param page the page to free up
204 * @throws IOException
205 * If an disk error occurred.
206 * @throws IllegalStateException
207 * if the PageFile is not loaded
208 */
209 public <T> void free(Page<T> page) throws IOException {
210 pageFile.assertLoaded();
211
212 // We may need loop to free up a page chain.
213 while (page != null) {
214
215 // Is it already free??
216 if (page.getType() == Page.PAGE_FREE_TYPE) {
217 return;
218 }
219
220 Page<T> next = null;
221 if (page.getType() == Page.PAGE_PART_TYPE) {
222 next = load(page.getNext(), null);
223 }
224
225 page.makeFree(getWriteTransactionId());
226 // ensure free page is visible while write is pending
227 pageFile.addToCache(page.copy());
228
229 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize());
230 page.write(out);
231 write(page, out.getData());
232
233 freeList.add(page.getPageId());
234 page = next;
235 }
236 }
237
238 /**
239 *
240 * @param page
241 * the page to write. The Page object must be fully populated with a valid pageId, type, and data.
242 * @param marshaller
243 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
244 * @param overflow
245 * If true, then if the page data marshalls to a bigger size than can fit in one page, then additional
246 * overflow pages are automatically allocated and chained to this page to store all the data. If false,
247 * and the overflow condition would occur, then the PageOverflowIOException is thrown.
248 * @throws IOException
249 * If an disk error occurred.
250 * @throws PageOverflowIOException
251 * If the page data marshalls to size larger than maximum page size and overflow was false.
252 * @throws IllegalStateException
253 * if the PageFile is not loaded
254 */
255 public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException {
256 DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow);
257 if (marshaller != null) {
258 marshaller.writePayload(page.get(), out);
259 }
260 out.close();
261 }
262
263 /**
264 * @throws IOException
265 */
266 public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException {
267 pageFile.assertLoaded();
268
269 // Copy to protect against the end user changing
270 // the page instance while we are doing a write.
271 final Page copy = page.copy();
272 pageFile.addToCache(copy);
273
274 //
275 // To support writing VERY large data, we override the output stream so
276 // that we
277 // we do the page writes incrementally while the data is being
278 // marshalled.
279 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize() * 2) {
280 Page current = copy;
281
282 @SuppressWarnings("unchecked")
283 @Override
284 protected void onWrite() throws IOException {
285
286 // Are we at an overflow condition?
287 final int pageSize = pageFile.getPageSize();
288 if (pos >= pageSize) {
289 // If overflow is allowed
290 if (overflow) {
291
292 do {
293 Page next;
294 if (current.getType() == Page.PAGE_PART_TYPE) {
295 next = load(current.getNext(), null);
296 } else {
297 next = allocate();
298 }
299
300 next.txId = current.txId;
301
302 // Write the page header
303 int oldPos = pos;
304 pos = 0;
305
306 current.makePagePart(next.getPageId(), getWriteTransactionId());
307 current.write(this);
308
309 // Do the page write..
310 byte[] data = new byte[pageSize];
311 System.arraycopy(buf, 0, data, 0, pageSize);
312 Transaction.this.write(current, data);
313
314 // make the new link visible
315 pageFile.addToCache(current);
316
317 // Reset for the next page chunk
318 pos = 0;
319 // The page header marshalled after the data is written.
320 skip(Page.PAGE_HEADER_SIZE);
321 // Move the overflow data after the header.
322 System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
323 pos += oldPos - pageSize;
324 current = next;
325
326 } while (pos > pageSize);
327 } else {
328 throw new PageOverflowIOException("Page overflow.");
329 }
330 }
331
332 }
333
334 @Override
335 public void close() throws IOException {
336 super.close();
337
338 // We need to free up the rest of the page chain..
339 if (current.getType() == Page.PAGE_PART_TYPE) {
340 free(current.getNext());
341 }
342
343 current.makePageEnd(pos, getWriteTransactionId());
344
345 // make visible as end page
346 pageFile.addToCache(current);
347
348 // Write the header..
349 pos = 0;
350 current.write(this);
351
352 Transaction.this.write(current, buf);
353 }
354 };
355
356 // The page header marshaled after the data is written.
357 out.skip(Page.PAGE_HEADER_SIZE);
358 return out;
359 }
360
361 /**
362 * Loads a page from disk.
363 *
364 * @param pageId
365 * the id of the page to load
366 * @param marshaller
367 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
368 * @return The page with the given id
369 * @throws IOException
370 * If an disk error occurred.
371 * @throws IllegalStateException
372 * if the PageFile is not loaded
373 */
374 public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException {
375 pageFile.assertLoaded();
376 Page<T> page = new Page<T>(pageId);
377 load(page, marshaller);
378 return page;
379 }
380
381 /**
382 * Loads a page from disk.
383 *
384 * @param page - The pageId field must be properly set
385 * @param marshaller
386 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
387 * @throws IOException
388 * If an disk error occurred.
389 * @throws InvalidPageIOException
390 * If the page is is not valid.
391 * @throws IllegalStateException
392 * if the PageFile is not loaded
393 */
394 @SuppressWarnings("unchecked")
395 public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException {
396 pageFile.assertLoaded();
397
398 // Can't load invalid offsets...
399 long pageId = page.getPageId();
400 if (pageId < 0) {
401 throw new InvalidPageIOException("Page id is not valid", pageId);
402 }
403
404 // It might be a page this transaction has modified...
405 PageWrite update = writes.get(pageId);
406 if (update != null) {
407 page.copy(update.getPage());
408 return;
409 }
410
411 // We may be able to get it from the cache...
412 Page<T> t = pageFile.getFromCache(pageId);
413 if (t != null) {
414 page.copy(t);
415 return;
416 }
417
418 if (marshaller != null) {
419 // Full page read..
420 InputStream is = openInputStream(page);
421 DataInputStream dataIn = new DataInputStream(is);
422 page.set(marshaller.readPayload(dataIn));
423 is.close();
424 } else {
425 // Page header read.
426 DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]);
427 pageFile.readPage(pageId, in.getRawData());
428 page.read(in);
429 page.set(null);
430 }
431
432 // Cache it.
433 if (marshaller != null) {
434 pageFile.addToCache(page);
435 }
436 }
437
438 /**
439 * @see org.apache.activemq.store.kahadb.disk.page.Transaction#load(org.apache.activemq.store.kahadb.disk.page.Page,
440 * org.apache.activemq.store.kahadb.disk.util.Marshaller)
441 */
442 public InputStream openInputStream(final Page p) throws IOException {
443
444 return new InputStream() {
445
446 private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]);
447 private Page page = readPage(p);
448 private int pageCount = 1;
449
450 private Page markPage;
451 private ByteSequence markChunk;
452
453 private Page readPage(Page page) throws IOException {
454 // Read the page data
455
456 pageFile.readPage(page.getPageId(), chunk.getData());
457
458 chunk.setOffset(0);
459 chunk.setLength(pageFile.getPageSize());
460
461 DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
462 page.read(in);
463
464 chunk.setOffset(Page.PAGE_HEADER_SIZE);
465 if (page.getType() == Page.PAGE_END_TYPE) {
466 chunk.setLength((int)(page.getNext()));
467 }
468
469 if (page.getType() == Page.PAGE_FREE_TYPE) {
470 throw new EOFException("Chunk stream does not exist, page: " + page.getPageId() + " is marked free");
471 }
472
473 return page;
474 }
475
476 public int read() throws IOException {
477 if (!atEOF()) {
478 return chunk.data[chunk.offset++] & 0xff;
479 } else {
480 return -1;
481 }
482 }
483
484 private boolean atEOF() throws IOException {
485 if (chunk.offset < chunk.length) {
486 return false;
487 }
488 if (page.getType() == Page.PAGE_END_TYPE) {
489 return true;
490 }
491 fill();
492 return chunk.offset >= chunk.length;
493 }
494
495 private void fill() throws IOException {
496 page = readPage(new Page(page.getNext()));
497 pageCount++;
498 }
499
500 public int read(byte[] b) throws IOException {
501 return read(b, 0, b.length);
502 }
503
504 public int read(byte b[], int off, int len) throws IOException {
505 if (!atEOF()) {
506 int rc = 0;
507 while (!atEOF() && rc < len) {
508 len = Math.min(len, chunk.length - chunk.offset);
509 if (len > 0) {
510 System.arraycopy(chunk.data, chunk.offset, b, off, len);
511 chunk.offset += len;
512 }
513 rc += len;
514 }
515 return rc;
516 } else {
517 return -1;
518 }
519 }
520
521 public long skip(long len) throws IOException {
522 if (atEOF()) {
523 int rc = 0;
524 while (!atEOF() && rc < len) {
525 len = Math.min(len, chunk.length - chunk.offset);
526 if (len > 0) {
527 chunk.offset += len;
528 }
529 rc += len;
530 }
531 return rc;
532 } else {
533 return -1;
534 }
535 }
536
537 public int available() {
538 return chunk.length - chunk.offset;
539 }
540
541 public boolean markSupported() {
542 return true;
543 }
544
545 public void mark(int markpos) {
546 markPage = page;
547 byte data[] = new byte[pageFile.getPageSize()];
548 System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize());
549 markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
550 }
551
552 public void reset() {
553 page = markPage;
554 chunk = markChunk;
555 }
556
557 };
558 }
559
560 /**
561 * Allows you to iterate through all active Pages in this object. Pages with type Page.FREE_TYPE are
562 * not included in this iteration.
563 *
564 * Pages removed with Iterator.remove() will not actually get removed until the transaction commits.
565 *
566 * @throws IllegalStateException
567 * if the PageFile is not loaded
568 */
569 public Iterator<Page> iterator() {
570 return (Iterator<Page>)iterator(false);
571 }
572
573 /**
574 * Allows you to iterate through all active Pages in this object. You can optionally include free pages in the pages
575 * iterated.
576 *
577 * @param includeFreePages - if true, free pages are included in the iteration
578 * @throws IllegalStateException
579 * if the PageFile is not loaded
580 */
581 public Iterator<Page> iterator(final boolean includeFreePages) {
582
583 pageFile.assertLoaded();
584
585 return new Iterator<Page>() {
586
587 long nextId;
588 Page nextPage;
589 Page lastPage;
590
591 private void findNextPage() {
592 if (!pageFile.isLoaded()) {
593 throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
594 }
595
596 if (nextPage != null) {
597 return;
598 }
599
600 try {
601 while (nextId < pageFile.getPageCount()) {
602
603 Page page = load(nextId, null);
604
605 if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) {
606 nextPage = page;
607 return;
608 } else {
609 nextId++;
610 }
611 }
612 } catch (IOException e) {
613 }
614 }
615
616 public boolean hasNext() {
617 findNextPage();
618 return nextPage != null;
619 }
620
621 public Page next() {
622 findNextPage();
623 if (nextPage != null) {
624 lastPage = nextPage;
625 nextPage = null;
626 nextId++;
627 return lastPage;
628 } else {
629 throw new NoSuchElementException();
630 }
631 }
632
633 @SuppressWarnings("unchecked")
634 public void remove() {
635 if (lastPage == null) {
636 throw new IllegalStateException();
637 }
638 try {
639 free(lastPage);
640 lastPage = null;
641 } catch (IOException e) {
642 throw new RuntimeException(e);
643 }
644 }
645 };
646 }
647
648 ///////////////////////////////////////////////////////////////////
649 // Commit / Rollback related methods..
650 ///////////////////////////////////////////////////////////////////
651
652 /**
653 * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated
654 * with the transaction are written to disk or none will.
655 */
656 public void commit() throws IOException {
657 if( writeTransactionId!=-1 ) {
658 if (tmpFile != null) {
659 tmpFile.close();
660 pageFile.removeTmpFile(getTempFile());
661 tmpFile = null;
662 txFile = null;
663 }
664 // Actually do the page writes...
665 pageFile.write(writes.entrySet());
666 // Release the pages that were freed up in the transaction..
667 freePages(freeList);
668
669 freeList.clear();
670 allocateList.clear();
671 writes.clear();
672 writeTransactionId = -1;
673 }
674 size = 0;
675 }
676
677 /**
678 * Rolls back the transaction.
679 */
680 public void rollback() throws IOException {
681 if( writeTransactionId!=-1 ) {
682 if (tmpFile != null) {
683 tmpFile.close();
684 pageFile.removeTmpFile(getTempFile());
685 tmpFile = null;
686 txFile = null;
687 }
688 // Release the pages that were allocated in the transaction...
689 freePages(allocateList);
690
691 freeList.clear();
692 allocateList.clear();
693 writes.clear();
694 writeTransactionId = -1;
695 }
696 size = 0;
697 }
698
699 private long getWriteTransactionId() {
700 if( writeTransactionId==-1 ) {
701 writeTransactionId = pageFile.getNextWriteTransactionId();
702 }
703 return writeTransactionId;
704 }
705
706
707 protected File getTempFile() {
708 if (txFile == null) {
709 txFile = new File(getPageFile().getDirectory(), IOHelper.toFileSystemSafeName("tx-" + Long.toString(getWriteTransactionId()) + "-" + Long.toString(System.currentTimeMillis()) + ".tmp"));
710 }
711 return txFile;
712 }
713
714 /**
715 * Queues up a page write that should get done when commit() gets called.
716 */
717 private void write(final Page page, byte[] data) throws IOException {
718 Long key = page.getPageId();
719
720 // how much pages we have for this transaction
721 size = writes.size() * pageFile.getPageSize();
722
723 PageWrite write;
724
725 if (size > maxTransactionSize) {
726 if (tmpFile == null) {
727 tmpFile = new RandomAccessFile(getTempFile(), "rw");
728 }
729 long location = nextLocation;
730 tmpFile.seek(nextLocation);
731 tmpFile.write(data);
732 nextLocation = location + data.length;
733 write = new PageWrite(page, location, data.length, getTempFile());
734 } else {
735 write = new PageWrite(page, data);
736 }
737 writes.put(key, write);
738 }
739
740 /**
741 * @param list
742 * @throws RuntimeException
743 */
744 private void freePages(SequenceSet list) throws RuntimeException {
745 Sequence seq = list.getHead();
746 while( seq!=null ) {
747 seq.each(new Sequence.Closure<RuntimeException>(){
748 public void execute(long value) {
749 pageFile.freePage(value);
750 }
751 });
752 seq = seq.getNext();
753 }
754 }
755
756 /**
757 * @return true if there are no uncommitted page file updates associated with this transaction.
758 */
759 public boolean isReadOnly() {
760 return writeTransactionId==-1;
761 }
762
763 ///////////////////////////////////////////////////////////////////
764 // Transaction closure helpers...
765 ///////////////////////////////////////////////////////////////////
766
767 /**
768 * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
769 * If the closure throws an Exception, then the transaction is rolled back.
770 *
771 * @param <T>
772 * @param closure - the work to get exectued.
773 * @throws T if the closure throws it
774 * @throws IOException If the commit fails.
775 */
776 public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException {
777 boolean success = false;
778 try {
779 closure.execute(this);
780 success = true;
781 } finally {
782 if (success) {
783 commit();
784 } else {
785 rollback();
786 }
787 }
788 }
789
790 /**
791 * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
792 * If the closure throws an Exception, then the transaction is rolled back.
793 *
794 * @param <T>
795 * @param closure - the work to get exectued.
796 * @throws T if the closure throws it
797 * @throws IOException If the commit fails.
798 */
799 public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException {
800 boolean success = false;
801 try {
802 R rc = closure.execute(this);
803 success = true;
804 return rc;
805 } finally {
806 if (success) {
807 commit();
808 } else {
809 rollback();
810 }
811 }
812 }
813 }