001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.page;
018
019import org.apache.activemq.store.kahadb.disk.page.PageFile.PageWrite;
020import org.apache.activemq.store.kahadb.disk.util.*;
021import org.apache.activemq.util.ByteSequence;
022import org.apache.activemq.store.kahadb.disk.util.DataByteArrayInputStream;
023import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
024import org.apache.activemq.util.IOHelper;
025
026import java.io.*;
027import java.util.Iterator;
028import java.util.NoSuchElementException;
029import java.util.TreeMap;
030
031/**
032 * The class used to read/update a PageFile object.  Using a transaction allows you to
033 * do multiple update operations in a single unit of work.
034 */
035public class Transaction implements Iterable<Page> {
036
037    private RandomAccessFile tmpFile;
038    private File txFile;
039    private long nextLocation = 0;
040
041    /**
042     * The PageOverflowIOException occurs when a page write is requested
043     * and it's data is larger than what would fit into a single page.
044     */
045    public class PageOverflowIOException extends IOException {
046        private static final long serialVersionUID = 1L;
047
048        public PageOverflowIOException(String message) {
049            super(message);
050        }
051    }
052
053    /**
054     * The InvalidPageIOException is thrown if try to load/store a a page
055     * with an invalid page id.
056     */
057    public class InvalidPageIOException extends IOException {
058        private static final long serialVersionUID = 1L;
059
060        private final long page;
061
062        public InvalidPageIOException(String message, long page) {
063            super(message);
064            this.page = page;
065        }
066
067        public long getPage() {
068            return page;
069        }
070    }
071
072    /**
073     * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
074     *
075     * @param <T> The type of exceptions that operation will throw.
076     */
077    public interface Closure <T extends Throwable> {
078        public void execute(Transaction tx) throws T;
079    }
080
081    /**
082     * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
083     *
084     * @param <R> The type of result that the closure produces.
085     * @param <T> The type of exceptions that operation will throw.
086     */
087    public interface CallableClosure<R, T extends Throwable> {
088        public R execute(Transaction tx) throws T;
089    }
090
091
092    // The page file that this Transaction operates against.
093    private final PageFile pageFile;
094    // If this transaction is updating stuff.. this is the tx of
095    private long writeTransactionId=-1;
096    // List of pages that this transaction has modified.
097    private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
098    // List of pages allocated in this transaction
099    private final SequenceSet allocateList = new SequenceSet();
100    // List of pages freed in this transaction
101    private final SequenceSet freeList = new SequenceSet();
102
103    private long maxTransactionSize = Long.getLong("maxKahaDBTxSize", 10485760L);
104
105    private long size = 0;
106
107    Transaction(PageFile pageFile) {
108        this.pageFile = pageFile;
109    }
110
111    /**
112     * @return the page file that created this Transaction
113     */
114    public PageFile getPageFile() {
115        return this.pageFile;
116    }
117
118    /**
119     * Allocates a free page that you can write data to.
120     *
121     * @return a newly allocated page.
122     * @throws IOException
123     *         If an disk error occurred.
124     * @throws IllegalStateException
125     *         if the PageFile is not loaded
126     */
127    public <T> Page<T> allocate() throws IOException {
128        return allocate(1);
129    }
130
131    /**
132     * Allocates a block of free pages that you can write data to.
133     *
134     * @param count the number of sequential pages to allocate
135     * @return the first page of the sequential set.
136     * @throws IOException
137     *         If an disk error occurred.
138     * @throws IllegalStateException
139     *         if the PageFile is not loaded
140     */
141    public <T> Page<T> allocate(int count) throws IOException {
142        Page<T> rc = pageFile.allocate(count);
143        allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1));
144        return rc;
145    }
146
147    /**
148     * Frees up a previously allocated page so that it can be re-allocated again.
149     *
150     * @param pageId the page to free up
151     * @throws IOException
152     *         If an disk error occurred.
153     * @throws IllegalStateException
154     *         if the PageFile is not loaded
155     */
156    public void free(long pageId) throws IOException {
157        free(load(pageId, null));
158    }
159
160    /**
161     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
162     *
163     * @param pageId the initial page of the sequence that will be getting freed
164     * @param count the number of pages in the sequence
165     *
166     * @throws IOException
167     *         If an disk error occurred.
168     * @throws IllegalStateException
169     *         if the PageFile is not loaded
170     */
171    public void free(long pageId, int count) throws IOException {
172        free(load(pageId, null), count);
173    }
174
175    /**
176     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
177     *
178     * @param page the initial page of the sequence that will be getting freed
179     * @param count the number of pages in the sequence
180     *
181     * @throws IOException
182     *         If an disk error occurred.
183     * @throws IllegalStateException
184     *         if the PageFile is not loaded
185     */
186    public <T> void free(Page<T> page, int count) throws IOException {
187        pageFile.assertLoaded();
188        long offsetPage = page.getPageId();
189        while (count-- > 0) {
190            if (page == null) {
191                page = load(offsetPage, null);
192            }
193            free(page);
194            page = null;
195            // Increment the offsetPage value since using it depends on the current count.
196            offsetPage++;
197        }
198    }
199
200    /**
201     * Frees up a previously allocated page so that it can be re-allocated again.
202     *
203     * @param page the page to free up
204     * @throws IOException
205     *         If an disk error occurred.
206     * @throws IllegalStateException
207     *         if the PageFile is not loaded
208     */
209    public <T> void free(Page<T> page) throws IOException {
210        pageFile.assertLoaded();
211
212        // We may need loop to free up a page chain.
213        while (page != null) {
214
215            // Is it already free??
216            if (page.getType() == Page.PAGE_FREE_TYPE) {
217                return;
218            }
219
220            Page<T> next = null;
221            if (page.getType() == Page.PAGE_PART_TYPE) {
222                next = load(page.getNext(), null);
223            }
224
225            page.makeFree(getWriteTransactionId());
226            // ensure free page is visible while write is pending
227            pageFile.addToCache(page.copy());
228
229            DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize());
230            page.write(out);
231            write(page, out.getData());
232
233            freeList.add(page.getPageId());
234            page = next;
235        }
236    }
237
238    /**
239     *
240     * @param page
241     *        the page to write. The Page object must be fully populated with a valid pageId, type, and data.
242     * @param marshaller
243     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
244     * @param overflow
245     *        If true, then if the page data marshalls to a bigger size than can fit in one page, then additional
246     *        overflow pages are automatically allocated and chained to this page to store all the data.  If false,
247     *        and the overflow condition would occur, then the PageOverflowIOException is thrown.
248     * @throws IOException
249     *         If an disk error occurred.
250     * @throws PageOverflowIOException
251     *         If the page data marshalls to size larger than maximum page size and overflow was false.
252     * @throws IllegalStateException
253     *         if the PageFile is not loaded
254     */
255    public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException {
256        DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow);
257        if (marshaller != null) {
258            marshaller.writePayload(page.get(), out);
259        }
260        out.close();
261    }
262
263    /**
264     * @throws IOException
265     */
266    public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException {
267        pageFile.assertLoaded();
268
269        // Copy to protect against the end user changing
270        // the page instance while we are doing a write.
271        final Page copy = page.copy();
272        pageFile.addToCache(copy);
273
274        //
275        // To support writing VERY large data, we override the output stream so
276        // that we
277        // we do the page writes incrementally while the data is being
278        // marshalled.
279        DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize() * 2) {
280            Page current = copy;
281
282            @SuppressWarnings("unchecked")
283            @Override
284            protected void onWrite() throws IOException {
285
286                // Are we at an overflow condition?
287                final int pageSize = pageFile.getPageSize();
288                if (pos >= pageSize) {
289                    // If overflow is allowed
290                    if (overflow) {
291
292                        do {
293                            Page next;
294                            if (current.getType() == Page.PAGE_PART_TYPE) {
295                                next = load(current.getNext(), null);
296                            } else {
297                                next = allocate();
298                            }
299
300                            next.txId = current.txId;
301
302                            // Write the page header
303                            int oldPos = pos;
304                            pos = 0;
305
306                            current.makePagePart(next.getPageId(), getWriteTransactionId());
307                            current.write(this);
308
309                            // Do the page write..
310                            byte[] data = new byte[pageSize];
311                            System.arraycopy(buf, 0, data, 0, pageSize);
312                            Transaction.this.write(current, data);
313
314                            // make the new link visible
315                            pageFile.addToCache(current);
316
317                            // Reset for the next page chunk
318                            pos = 0;
319                            // The page header marshalled after the data is written.
320                            skip(Page.PAGE_HEADER_SIZE);
321                            // Move the overflow data after the header.
322                            System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
323                            pos += oldPos - pageSize;
324                            current = next;
325
326                        } while (pos > pageSize);
327                    } else {
328                        throw new PageOverflowIOException("Page overflow.");
329                    }
330                }
331
332            }
333
334            @Override
335            public void close() throws IOException {
336                super.close();
337
338                // We need to free up the rest of the page chain..
339                if (current.getType() == Page.PAGE_PART_TYPE) {
340                    free(current.getNext());
341                }
342
343                current.makePageEnd(pos, getWriteTransactionId());
344
345                // make visible as end page
346                pageFile.addToCache(current);
347
348                // Write the header..
349                pos = 0;
350                current.write(this);
351
352                Transaction.this.write(current, buf);
353            }
354        };
355
356        // The page header marshaled after the data is written.
357        out.skip(Page.PAGE_HEADER_SIZE);
358        return out;
359    }
360
361    /**
362     * Loads a page from disk.
363     *
364     * @param pageId
365     *        the id of the page to load
366     * @param marshaller
367     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
368     * @return The page with the given id
369     * @throws IOException
370     *         If an disk error occurred.
371     * @throws IllegalStateException
372     *         if the PageFile is not loaded
373     */
374    public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException {
375        pageFile.assertLoaded();
376        Page<T> page = new Page<T>(pageId);
377        load(page, marshaller);
378        return page;
379    }
380
381    /**
382     * Loads a page from disk.
383     *
384     * @param page - The pageId field must be properly set
385     * @param marshaller
386     *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
387     * @throws IOException
388     *         If an disk error occurred.
389     * @throws InvalidPageIOException
390     *         If the page is is not valid.
391     * @throws IllegalStateException
392     *         if the PageFile is not loaded
393     */
394    @SuppressWarnings("unchecked")
395    public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException {
396        pageFile.assertLoaded();
397
398        // Can't load invalid offsets...
399        long pageId = page.getPageId();
400        if (pageId < 0) {
401            throw new InvalidPageIOException("Page id is not valid", pageId);
402        }
403
404        // It might be a page this transaction has modified...
405        PageWrite update = writes.get(pageId);
406        if (update != null) {
407            page.copy(update.getPage());
408            return;
409        }
410
411        // We may be able to get it from the cache...
412        Page<T> t = pageFile.getFromCache(pageId);
413        if (t != null) {
414            page.copy(t);
415            return;
416        }
417
418        if (marshaller != null) {
419            // Full page read..
420            InputStream is = openInputStream(page);
421            DataInputStream dataIn = new DataInputStream(is);
422            page.set(marshaller.readPayload(dataIn));
423            is.close();
424        } else {
425            // Page header read.
426            DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]);
427            pageFile.readPage(pageId, in.getRawData());
428            page.read(in);
429            page.set(null);
430        }
431
432        // Cache it.
433        if (marshaller != null) {
434            pageFile.addToCache(page);
435        }
436    }
437
438    /**
439     * @see org.apache.activemq.store.kahadb.disk.page.Transaction#load(org.apache.activemq.store.kahadb.disk.page.Page,
440     *      org.apache.activemq.store.kahadb.disk.util.Marshaller)
441     */
442    public InputStream openInputStream(final Page p) throws IOException {
443
444        return new InputStream() {
445
446            private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]);
447            private Page page = readPage(p);
448            private int pageCount = 1;
449
450            private Page markPage;
451            private ByteSequence markChunk;
452
453            private Page readPage(Page page) throws IOException {
454                // Read the page data
455
456                pageFile.readPage(page.getPageId(), chunk.getData());
457
458                chunk.setOffset(0);
459                chunk.setLength(pageFile.getPageSize());
460
461                DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
462                page.read(in);
463
464                chunk.setOffset(Page.PAGE_HEADER_SIZE);
465                if (page.getType() == Page.PAGE_END_TYPE) {
466                    chunk.setLength((int)(page.getNext()));
467                }
468
469                if (page.getType() == Page.PAGE_FREE_TYPE) {
470                    throw new EOFException("Chunk stream does not exist, page: " + page.getPageId() + " is marked free");
471                }
472
473                return page;
474            }
475
476            public int read() throws IOException {
477                if (!atEOF()) {
478                    return chunk.data[chunk.offset++] & 0xff;
479                } else {
480                    return -1;
481                }
482            }
483
484            private boolean atEOF() throws IOException {
485                if (chunk.offset < chunk.length) {
486                    return false;
487                }
488                if (page.getType() == Page.PAGE_END_TYPE) {
489                    return true;
490                }
491                fill();
492                return chunk.offset >= chunk.length;
493            }
494
495            private void fill() throws IOException {
496                page = readPage(new Page(page.getNext()));
497                pageCount++;
498            }
499
500            public int read(byte[] b) throws IOException {
501                return read(b, 0, b.length);
502            }
503
504            public int read(byte b[], int off, int len) throws IOException {
505                if (!atEOF()) {
506                    int rc = 0;
507                    while (!atEOF() && rc < len) {
508                        len = Math.min(len, chunk.length - chunk.offset);
509                        if (len > 0) {
510                            System.arraycopy(chunk.data, chunk.offset, b, off, len);
511                            chunk.offset += len;
512                        }
513                        rc += len;
514                    }
515                    return rc;
516                } else {
517                    return -1;
518                }
519            }
520
521            public long skip(long len) throws IOException {
522                if (atEOF()) {
523                    int rc = 0;
524                    while (!atEOF() && rc < len) {
525                        len = Math.min(len, chunk.length - chunk.offset);
526                        if (len > 0) {
527                            chunk.offset += len;
528                        }
529                        rc += len;
530                    }
531                    return rc;
532                } else {
533                    return -1;
534                }
535            }
536
537            public int available() {
538                return chunk.length - chunk.offset;
539            }
540
541            public boolean markSupported() {
542                return true;
543            }
544
545            public void mark(int markpos) {
546                markPage = page;
547                byte data[] = new byte[pageFile.getPageSize()];
548                System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize());
549                markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
550            }
551
552            public void reset() {
553                page = markPage;
554                chunk = markChunk;
555            }
556
557        };
558    }
559
560    /**
561     * Allows you to iterate through all active Pages in this object.  Pages with type Page.FREE_TYPE are
562     * not included in this iteration.
563     *
564     * Pages removed with Iterator.remove() will not actually get removed until the transaction commits.
565     *
566     * @throws IllegalStateException
567     *         if the PageFile is not loaded
568     */
569    public Iterator<Page> iterator() {
570        return (Iterator<Page>)iterator(false);
571    }
572
573    /**
574     * Allows you to iterate through all active Pages in this object.  You can optionally include free pages in the pages
575     * iterated.
576     *
577     * @param includeFreePages - if true, free pages are included in the iteration
578     * @throws IllegalStateException
579     *         if the PageFile is not loaded
580     */
581    public Iterator<Page> iterator(final boolean includeFreePages) {
582
583        pageFile.assertLoaded();
584
585        return new Iterator<Page>() {
586
587            long nextId;
588            Page nextPage;
589            Page lastPage;
590
591            private void findNextPage() {
592                if (!pageFile.isLoaded()) {
593                    throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
594                }
595
596                if (nextPage != null) {
597                    return;
598                }
599
600                try {
601                    while (nextId < pageFile.getPageCount()) {
602
603                        Page page = load(nextId, null);
604
605                        if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) {
606                            nextPage = page;
607                            return;
608                        } else {
609                            nextId++;
610                        }
611                    }
612                } catch (IOException e) {
613                }
614            }
615
616            public boolean hasNext() {
617                findNextPage();
618                return nextPage != null;
619            }
620
621            public Page next() {
622                findNextPage();
623                if (nextPage != null) {
624                    lastPage = nextPage;
625                    nextPage = null;
626                    nextId++;
627                    return lastPage;
628                } else {
629                    throw new NoSuchElementException();
630                }
631            }
632
633            @SuppressWarnings("unchecked")
634            public void remove() {
635                if (lastPage == null) {
636                    throw new IllegalStateException();
637                }
638                try {
639                    free(lastPage);
640                    lastPage = null;
641                } catch (IOException e) {
642                    throw new RuntimeException(e);
643                }
644            }
645        };
646    }
647
648    ///////////////////////////////////////////////////////////////////
649    // Commit / Rollback related methods..
650    ///////////////////////////////////////////////////////////////////
651
652    /**
653     * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated
654     * with the transaction are written to disk or none will.
655     */
656    public void commit() throws IOException {
657        if( writeTransactionId!=-1 ) {
658            if (tmpFile != null) {
659                tmpFile.close();
660                pageFile.removeTmpFile(getTempFile());
661                tmpFile = null;
662                txFile = null;
663            }
664            // Actually do the page writes...
665            pageFile.write(writes.entrySet());
666            // Release the pages that were freed up in the transaction..
667            freePages(freeList);
668
669            freeList.clear();
670            allocateList.clear();
671            writes.clear();
672            writeTransactionId = -1;
673        }
674        size = 0;
675    }
676
677    /**
678     * Rolls back the transaction.
679     */
680    public void rollback() throws IOException {
681        if( writeTransactionId!=-1 ) {
682            if (tmpFile != null) {
683                tmpFile.close();
684                pageFile.removeTmpFile(getTempFile());
685                tmpFile = null;
686                txFile = null;
687            }
688            // Release the pages that were allocated in the transaction...
689            freePages(allocateList);
690
691            freeList.clear();
692            allocateList.clear();
693            writes.clear();
694            writeTransactionId = -1;
695        }
696        size = 0;
697    }
698
699    private long getWriteTransactionId() {
700        if( writeTransactionId==-1 ) {
701            writeTransactionId = pageFile.getNextWriteTransactionId();
702        }
703        return writeTransactionId;
704    }
705
706
707    protected File getTempFile() {
708        if (txFile == null) {
709            txFile = new File(getPageFile().getDirectory(), IOHelper.toFileSystemSafeName("tx-" + Long.toString(getWriteTransactionId()) + "-" + Long.toString(System.currentTimeMillis()) + ".tmp"));
710        }
711       return txFile;
712    }
713
714    /**
715     * Queues up a page write that should get done when commit() gets called.
716     */
717    private void write(final Page page, byte[] data) throws IOException {
718        Long key = page.getPageId();
719
720        // how much pages we have for this transaction
721        size = writes.size() * pageFile.getPageSize();
722
723        PageWrite write;
724
725        if (size > maxTransactionSize) {
726            if (tmpFile == null) {
727                tmpFile = new RandomAccessFile(getTempFile(), "rw");
728            }
729            long location = nextLocation;
730            tmpFile.seek(nextLocation);
731            tmpFile.write(data);
732            nextLocation = location + data.length;
733            write = new PageWrite(page, location, data.length, getTempFile());
734        } else {
735            write = new PageWrite(page, data);
736        }
737        writes.put(key, write);
738    }
739
740    /**
741     * @param list
742     * @throws RuntimeException
743     */
744    private void freePages(SequenceSet list) throws RuntimeException {
745        Sequence seq = list.getHead();
746        while( seq!=null ) {
747            seq.each(new Sequence.Closure<RuntimeException>(){
748                public void execute(long value) {
749                    pageFile.freePage(value);
750                }
751            });
752            seq = seq.getNext();
753        }
754    }
755
756    /**
757     * @return true if there are no uncommitted page file updates associated with this transaction.
758     */
759    public boolean isReadOnly() {
760        return writeTransactionId==-1;
761    }
762
763    ///////////////////////////////////////////////////////////////////
764    // Transaction closure helpers...
765    ///////////////////////////////////////////////////////////////////
766
767    /**
768     * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
769     * If the closure throws an Exception, then the transaction is rolled back.
770     *
771     * @param <T>
772     * @param closure - the work to get exectued.
773     * @throws T if the closure throws it
774     * @throws IOException If the commit fails.
775     */
776    public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException {
777        boolean success = false;
778        try {
779            closure.execute(this);
780            success = true;
781        } finally {
782            if (success) {
783                commit();
784            } else {
785                rollback();
786            }
787        }
788    }
789
790    /**
791     * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
792     * If the closure throws an Exception, then the transaction is rolled back.
793     *
794     * @param <T>
795     * @param closure - the work to get exectued.
796     * @throws T if the closure throws it
797     * @throws IOException If the commit fails.
798     */
799    public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException {
800        boolean success = false;
801        try {
802            R rc = closure.execute(this);
803            success = true;
804            return rc;
805        } finally {
806            if (success) {
807                commit();
808            } else {
809                rollback();
810            }
811        }
812    }
813}