001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.disk.page;
018    
019    import org.apache.activemq.store.kahadb.disk.page.PageFile.PageWrite;
020    import org.apache.activemq.store.kahadb.disk.util.*;
021    import org.apache.activemq.util.ByteSequence;
022    import org.apache.activemq.store.kahadb.disk.util.DataByteArrayInputStream;
023    import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
024    import org.apache.activemq.util.IOHelper;
025    
026    import java.io.*;
027    import java.util.Iterator;
028    import java.util.NoSuchElementException;
029    import java.util.TreeMap;
030    
031    /**
032     * The class used to read/update a PageFile object.  Using a transaction allows you to
033     * do multiple update operations in a single unit of work.
034     */
035    public class Transaction implements Iterable<Page> {
036    
037        private RandomAccessFile tmpFile;
038        private File txFile;
039        private long nextLocation = 0;
040    
041        /**
042         * The PageOverflowIOException occurs when a page write is requested
043         * and it's data is larger than what would fit into a single page.
044         */
045        public class PageOverflowIOException extends IOException {
046            private static final long serialVersionUID = 1L;
047    
048            public PageOverflowIOException(String message) {
049                super(message);
050            }
051        }
052    
053        /**
054         * The InvalidPageIOException is thrown if try to load/store a a page
055         * with an invalid page id.
056         */
057        public class InvalidPageIOException extends IOException {
058            private static final long serialVersionUID = 1L;
059    
060            private final long page;
061    
062            public InvalidPageIOException(String message, long page) {
063                super(message);
064                this.page = page;
065            }
066    
067            public long getPage() {
068                return page;
069            }
070        }
071    
072        /**
073         * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
074         *
075         * @param <T> The type of exceptions that operation will throw.
076         */
077        public interface Closure <T extends Throwable> {
078            public void execute(Transaction tx) throws T;
079        }
080    
081        /**
082         * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
083         *
084         * @param <R> The type of result that the closure produces.
085         * @param <T> The type of exceptions that operation will throw.
086         */
087        public interface CallableClosure<R, T extends Throwable> {
088            public R execute(Transaction tx) throws T;
089        }
090    
091    
092        // The page file that this Transaction operates against.
093        private final PageFile pageFile;
094        // If this transaction is updating stuff.. this is the tx of
095        private long writeTransactionId=-1;
096        // List of pages that this transaction has modified.
097        private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
098        // List of pages allocated in this transaction
099        private final SequenceSet allocateList = new SequenceSet();
100        // List of pages freed in this transaction
101        private final SequenceSet freeList = new SequenceSet();
102    
103        private long maxTransactionSize = Long.getLong("maxKahaDBTxSize", 10485760L);
104    
105        private long size = 0;
106    
107        Transaction(PageFile pageFile) {
108            this.pageFile = pageFile;
109        }
110    
111        /**
112         * @return the page file that created this Transaction
113         */
114        public PageFile getPageFile() {
115            return this.pageFile;
116        }
117    
118        /**
119         * Allocates a free page that you can write data to.
120         *
121         * @return a newly allocated page.
122         * @throws IOException
123         *         If an disk error occurred.
124         * @throws IllegalStateException
125         *         if the PageFile is not loaded
126         */
127        public <T> Page<T> allocate() throws IOException {
128            return allocate(1);
129        }
130    
131        /**
132         * Allocates a block of free pages that you can write data to.
133         *
134         * @param count the number of sequential pages to allocate
135         * @return the first page of the sequential set.
136         * @throws IOException
137         *         If an disk error occurred.
138         * @throws IllegalStateException
139         *         if the PageFile is not loaded
140         */
141        public <T> Page<T> allocate(int count) throws IOException {
142            Page<T> rc = pageFile.allocate(count);
143            allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1));
144            return rc;
145        }
146    
147        /**
148         * Frees up a previously allocated page so that it can be re-allocated again.
149         *
150         * @param pageId the page to free up
151         * @throws IOException
152         *         If an disk error occurred.
153         * @throws IllegalStateException
154         *         if the PageFile is not loaded
155         */
156        public void free(long pageId) throws IOException {
157            free(load(pageId, null));
158        }
159    
160        /**
161         * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
162         *
163         * @param pageId the initial page of the sequence that will be getting freed
164         * @param count the number of pages in the sequence
165         *
166         * @throws IOException
167         *         If an disk error occurred.
168         * @throws IllegalStateException
169         *         if the PageFile is not loaded
170         */
171        public void free(long pageId, int count) throws IOException {
172            free(load(pageId, null), count);
173        }
174    
175        /**
176         * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
177         *
178         * @param page the initial page of the sequence that will be getting freed
179         * @param count the number of pages in the sequence
180         *
181         * @throws IOException
182         *         If an disk error occurred.
183         * @throws IllegalStateException
184         *         if the PageFile is not loaded
185         */
186        public <T> void free(Page<T> page, int count) throws IOException {
187            pageFile.assertLoaded();
188            long offsetPage = page.getPageId();
189            while (count-- > 0) {
190                if (page == null) {
191                    page = load(offsetPage, null);
192                }
193                free(page);
194                page = null;
195                // Increment the offsetPage value since using it depends on the current count.
196                offsetPage++;
197            }
198        }
199    
200        /**
201         * Frees up a previously allocated page so that it can be re-allocated again.
202         *
203         * @param page the page to free up
204         * @throws IOException
205         *         If an disk error occurred.
206         * @throws IllegalStateException
207         *         if the PageFile is not loaded
208         */
209        public <T> void free(Page<T> page) throws IOException {
210            pageFile.assertLoaded();
211    
212            // We may need loop to free up a page chain.
213            while (page != null) {
214    
215                // Is it already free??
216                if (page.getType() == Page.PAGE_FREE_TYPE) {
217                    return;
218                }
219    
220                Page<T> next = null;
221                if (page.getType() == Page.PAGE_PART_TYPE) {
222                    next = load(page.getNext(), null);
223                }
224    
225                page.makeFree(getWriteTransactionId());
226                // ensure free page is visible while write is pending
227                pageFile.addToCache(page.copy());
228    
229                DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize());
230                page.write(out);
231                write(page, out.getData());
232    
233                freeList.add(page.getPageId());
234                page = next;
235            }
236        }
237    
238        /**
239         *
240         * @param page
241         *        the page to write. The Page object must be fully populated with a valid pageId, type, and data.
242         * @param marshaller
243         *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
244         * @param overflow
245         *        If true, then if the page data marshalls to a bigger size than can fit in one page, then additional
246         *        overflow pages are automatically allocated and chained to this page to store all the data.  If false,
247         *        and the overflow condition would occur, then the PageOverflowIOException is thrown.
248         * @throws IOException
249         *         If an disk error occurred.
250         * @throws PageOverflowIOException
251         *         If the page data marshalls to size larger than maximum page size and overflow was false.
252         * @throws IllegalStateException
253         *         if the PageFile is not loaded
254         */
255        public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException {
256            DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow);
257            if (marshaller != null) {
258                marshaller.writePayload(page.get(), out);
259            }
260            out.close();
261        }
262    
263        /**
264         * @throws IOException
265         */
266        public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException {
267            pageFile.assertLoaded();
268    
269            // Copy to protect against the end user changing
270            // the page instance while we are doing a write.
271            final Page copy = page.copy();
272            pageFile.addToCache(copy);
273    
274            //
275            // To support writing VERY large data, we override the output stream so
276            // that we
277            // we do the page writes incrementally while the data is being
278            // marshalled.
279            DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize() * 2) {
280                Page current = copy;
281    
282                @SuppressWarnings("unchecked")
283                @Override
284                protected void onWrite() throws IOException {
285    
286                    // Are we at an overflow condition?
287                    final int pageSize = pageFile.getPageSize();
288                    if (pos >= pageSize) {
289                        // If overflow is allowed
290                        if (overflow) {
291    
292                            do {
293                                Page next;
294                                if (current.getType() == Page.PAGE_PART_TYPE) {
295                                    next = load(current.getNext(), null);
296                                } else {
297                                    next = allocate();
298                                }
299    
300                                next.txId = current.txId;
301    
302                                // Write the page header
303                                int oldPos = pos;
304                                pos = 0;
305    
306                                current.makePagePart(next.getPageId(), getWriteTransactionId());
307                                current.write(this);
308    
309                                // Do the page write..
310                                byte[] data = new byte[pageSize];
311                                System.arraycopy(buf, 0, data, 0, pageSize);
312                                Transaction.this.write(current, data);
313    
314                                // make the new link visible
315                                pageFile.addToCache(current);
316    
317                                // Reset for the next page chunk
318                                pos = 0;
319                                // The page header marshalled after the data is written.
320                                skip(Page.PAGE_HEADER_SIZE);
321                                // Move the overflow data after the header.
322                                System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
323                                pos += oldPos - pageSize;
324                                current = next;
325    
326                            } while (pos > pageSize);
327                        } else {
328                            throw new PageOverflowIOException("Page overflow.");
329                        }
330                    }
331    
332                }
333    
334                @Override
335                public void close() throws IOException {
336                    super.close();
337    
338                    // We need to free up the rest of the page chain..
339                    if (current.getType() == Page.PAGE_PART_TYPE) {
340                        free(current.getNext());
341                    }
342    
343                    current.makePageEnd(pos, getWriteTransactionId());
344    
345                    // make visible as end page
346                    pageFile.addToCache(current);
347    
348                    // Write the header..
349                    pos = 0;
350                    current.write(this);
351    
352                    Transaction.this.write(current, buf);
353                }
354            };
355    
356            // The page header marshaled after the data is written.
357            out.skip(Page.PAGE_HEADER_SIZE);
358            return out;
359        }
360    
361        /**
362         * Loads a page from disk.
363         *
364         * @param pageId
365         *        the id of the page to load
366         * @param marshaller
367         *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
368         * @return The page with the given id
369         * @throws IOException
370         *         If an disk error occurred.
371         * @throws IllegalStateException
372         *         if the PageFile is not loaded
373         */
374        public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException {
375            pageFile.assertLoaded();
376            Page<T> page = new Page<T>(pageId);
377            load(page, marshaller);
378            return page;
379        }
380    
381        /**
382         * Loads a page from disk.
383         *
384         * @param page - The pageId field must be properly set
385         * @param marshaller
386         *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
387         * @throws IOException
388         *         If an disk error occurred.
389         * @throws InvalidPageIOException
390         *         If the page is is not valid.
391         * @throws IllegalStateException
392         *         if the PageFile is not loaded
393         */
394        @SuppressWarnings("unchecked")
395        public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException {
396            pageFile.assertLoaded();
397    
398            // Can't load invalid offsets...
399            long pageId = page.getPageId();
400            if (pageId < 0) {
401                throw new InvalidPageIOException("Page id is not valid", pageId);
402            }
403    
404            // It might be a page this transaction has modified...
405            PageWrite update = writes.get(pageId);
406            if (update != null) {
407                page.copy(update.getPage());
408                return;
409            }
410    
411            // We may be able to get it from the cache...
412            Page<T> t = pageFile.getFromCache(pageId);
413            if (t != null) {
414                page.copy(t);
415                return;
416            }
417    
418            if (marshaller != null) {
419                // Full page read..
420                InputStream is = openInputStream(page);
421                DataInputStream dataIn = new DataInputStream(is);
422                page.set(marshaller.readPayload(dataIn));
423                is.close();
424            } else {
425                // Page header read.
426                DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]);
427                pageFile.readPage(pageId, in.getRawData());
428                page.read(in);
429                page.set(null);
430            }
431    
432            // Cache it.
433            if (marshaller != null) {
434                pageFile.addToCache(page);
435            }
436        }
437    
438        /**
439         * @see org.apache.activemq.store.kahadb.disk.page.Transaction#load(org.apache.activemq.store.kahadb.disk.page.Page,
440         *      org.apache.activemq.store.kahadb.disk.util.Marshaller)
441         */
442        public InputStream openInputStream(final Page p) throws IOException {
443    
444            return new InputStream() {
445    
446                private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]);
447                private Page page = readPage(p);
448                private int pageCount = 1;
449    
450                private Page markPage;
451                private ByteSequence markChunk;
452    
453                private Page readPage(Page page) throws IOException {
454                    // Read the page data
455    
456                    pageFile.readPage(page.getPageId(), chunk.getData());
457    
458                    chunk.setOffset(0);
459                    chunk.setLength(pageFile.getPageSize());
460    
461                    DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
462                    page.read(in);
463    
464                    chunk.setOffset(Page.PAGE_HEADER_SIZE);
465                    if (page.getType() == Page.PAGE_END_TYPE) {
466                        chunk.setLength((int)(page.getNext()));
467                    }
468    
469                    if (page.getType() == Page.PAGE_FREE_TYPE) {
470                        throw new EOFException("Chunk stream does not exist, page: " + page.getPageId() + " is marked free");
471                    }
472    
473                    return page;
474                }
475    
476                public int read() throws IOException {
477                    if (!atEOF()) {
478                        return chunk.data[chunk.offset++] & 0xff;
479                    } else {
480                        return -1;
481                    }
482                }
483    
484                private boolean atEOF() throws IOException {
485                    if (chunk.offset < chunk.length) {
486                        return false;
487                    }
488                    if (page.getType() == Page.PAGE_END_TYPE) {
489                        return true;
490                    }
491                    fill();
492                    return chunk.offset >= chunk.length;
493                }
494    
495                private void fill() throws IOException {
496                    page = readPage(new Page(page.getNext()));
497                    pageCount++;
498                }
499    
500                public int read(byte[] b) throws IOException {
501                    return read(b, 0, b.length);
502                }
503    
504                public int read(byte b[], int off, int len) throws IOException {
505                    if (!atEOF()) {
506                        int rc = 0;
507                        while (!atEOF() && rc < len) {
508                            len = Math.min(len, chunk.length - chunk.offset);
509                            if (len > 0) {
510                                System.arraycopy(chunk.data, chunk.offset, b, off, len);
511                                chunk.offset += len;
512                            }
513                            rc += len;
514                        }
515                        return rc;
516                    } else {
517                        return -1;
518                    }
519                }
520    
521                public long skip(long len) throws IOException {
522                    if (atEOF()) {
523                        int rc = 0;
524                        while (!atEOF() && rc < len) {
525                            len = Math.min(len, chunk.length - chunk.offset);
526                            if (len > 0) {
527                                chunk.offset += len;
528                            }
529                            rc += len;
530                        }
531                        return rc;
532                    } else {
533                        return -1;
534                    }
535                }
536    
537                public int available() {
538                    return chunk.length - chunk.offset;
539                }
540    
541                public boolean markSupported() {
542                    return true;
543                }
544    
545                public void mark(int markpos) {
546                    markPage = page;
547                    byte data[] = new byte[pageFile.getPageSize()];
548                    System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize());
549                    markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
550                }
551    
552                public void reset() {
553                    page = markPage;
554                    chunk = markChunk;
555                }
556    
557            };
558        }
559    
560        /**
561         * Allows you to iterate through all active Pages in this object.  Pages with type Page.FREE_TYPE are
562         * not included in this iteration.
563         *
564         * Pages removed with Iterator.remove() will not actually get removed until the transaction commits.
565         *
566         * @throws IllegalStateException
567         *         if the PageFile is not loaded
568         */
569        public Iterator<Page> iterator() {
570            return (Iterator<Page>)iterator(false);
571        }
572    
573        /**
574         * Allows you to iterate through all active Pages in this object.  You can optionally include free pages in the pages
575         * iterated.
576         *
577         * @param includeFreePages - if true, free pages are included in the iteration
578         * @throws IllegalStateException
579         *         if the PageFile is not loaded
580         */
581        public Iterator<Page> iterator(final boolean includeFreePages) {
582    
583            pageFile.assertLoaded();
584    
585            return new Iterator<Page>() {
586    
587                long nextId;
588                Page nextPage;
589                Page lastPage;
590    
591                private void findNextPage() {
592                    if (!pageFile.isLoaded()) {
593                        throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
594                    }
595    
596                    if (nextPage != null) {
597                        return;
598                    }
599    
600                    try {
601                        while (nextId < pageFile.getPageCount()) {
602    
603                            Page page = load(nextId, null);
604    
605                            if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) {
606                                nextPage = page;
607                                return;
608                            } else {
609                                nextId++;
610                            }
611                        }
612                    } catch (IOException e) {
613                    }
614                }
615    
616                public boolean hasNext() {
617                    findNextPage();
618                    return nextPage != null;
619                }
620    
621                public Page next() {
622                    findNextPage();
623                    if (nextPage != null) {
624                        lastPage = nextPage;
625                        nextPage = null;
626                        nextId++;
627                        return lastPage;
628                    } else {
629                        throw new NoSuchElementException();
630                    }
631                }
632    
633                @SuppressWarnings("unchecked")
634                public void remove() {
635                    if (lastPage == null) {
636                        throw new IllegalStateException();
637                    }
638                    try {
639                        free(lastPage);
640                        lastPage = null;
641                    } catch (IOException e) {
642                        throw new RuntimeException(e);
643                    }
644                }
645            };
646        }
647    
648        ///////////////////////////////////////////////////////////////////
649        // Commit / Rollback related methods..
650        ///////////////////////////////////////////////////////////////////
651    
652        /**
653         * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated
654         * with the transaction are written to disk or none will.
655         */
656        public void commit() throws IOException {
657            if( writeTransactionId!=-1 ) {
658                if (tmpFile != null) {
659                    tmpFile.close();
660                    pageFile.removeTmpFile(getTempFile());
661                    tmpFile = null;
662                    txFile = null;
663                }
664                // Actually do the page writes...
665                pageFile.write(writes.entrySet());
666                // Release the pages that were freed up in the transaction..
667                freePages(freeList);
668    
669                freeList.clear();
670                allocateList.clear();
671                writes.clear();
672                writeTransactionId = -1;
673            }
674            size = 0;
675        }
676    
677        /**
678         * Rolls back the transaction.
679         */
680        public void rollback() throws IOException {
681            if( writeTransactionId!=-1 ) {
682                if (tmpFile != null) {
683                    tmpFile.close();
684                    pageFile.removeTmpFile(getTempFile());
685                    tmpFile = null;
686                    txFile = null;
687                }
688                // Release the pages that were allocated in the transaction...
689                freePages(allocateList);
690    
691                freeList.clear();
692                allocateList.clear();
693                writes.clear();
694                writeTransactionId = -1;
695            }
696            size = 0;
697        }
698    
699        private long getWriteTransactionId() {
700            if( writeTransactionId==-1 ) {
701                writeTransactionId = pageFile.getNextWriteTransactionId();
702            }
703            return writeTransactionId;
704        }
705    
706    
707        protected File getTempFile() {
708            if (txFile == null) {
709                txFile = new File(getPageFile().getDirectory(), IOHelper.toFileSystemSafeName("tx-" + Long.toString(getWriteTransactionId()) + "-" + Long.toString(System.currentTimeMillis()) + ".tmp"));
710            }
711           return txFile;
712        }
713    
714        /**
715         * Queues up a page write that should get done when commit() gets called.
716         */
717        private void write(final Page page, byte[] data) throws IOException {
718            Long key = page.getPageId();
719    
720            // how much pages we have for this transaction
721            size = writes.size() * pageFile.getPageSize();
722    
723            PageWrite write;
724    
725            if (size > maxTransactionSize) {
726                if (tmpFile == null) {
727                    tmpFile = new RandomAccessFile(getTempFile(), "rw");
728                }
729                long location = nextLocation;
730                tmpFile.seek(nextLocation);
731                tmpFile.write(data);
732                nextLocation = location + data.length;
733                write = new PageWrite(page, location, data.length, getTempFile());
734            } else {
735                write = new PageWrite(page, data);
736            }
737            writes.put(key, write);
738        }
739    
740        /**
741         * @param list
742         * @throws RuntimeException
743         */
744        private void freePages(SequenceSet list) throws RuntimeException {
745            Sequence seq = list.getHead();
746            while( seq!=null ) {
747                seq.each(new Sequence.Closure<RuntimeException>(){
748                    public void execute(long value) {
749                        pageFile.freePage(value);
750                    }
751                });
752                seq = seq.getNext();
753            }
754        }
755    
756        /**
757         * @return true if there are no uncommitted page file updates associated with this transaction.
758         */
759        public boolean isReadOnly() {
760            return writeTransactionId==-1;
761        }
762    
763        ///////////////////////////////////////////////////////////////////
764        // Transaction closure helpers...
765        ///////////////////////////////////////////////////////////////////
766    
767        /**
768         * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
769         * If the closure throws an Exception, then the transaction is rolled back.
770         *
771         * @param <T>
772         * @param closure - the work to get exectued.
773         * @throws T if the closure throws it
774         * @throws IOException If the commit fails.
775         */
776        public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException {
777            boolean success = false;
778            try {
779                closure.execute(this);
780                success = true;
781            } finally {
782                if (success) {
783                    commit();
784                } else {
785                    rollback();
786                }
787            }
788        }
789    
790        /**
791         * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
792         * If the closure throws an Exception, then the transaction is rolled back.
793         *
794         * @param <T>
795         * @param closure - the work to get exectued.
796         * @throws T if the closure throws it
797         * @throws IOException If the commit fails.
798         */
799        public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException {
800            boolean success = false;
801            try {
802                R rc = closure.execute(this);
803                success = true;
804                return rc;
805            } finally {
806                if (success) {
807                    commit();
808                } else {
809                    rollback();
810                }
811            }
812        }
813    }