001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.journal;
018
019import java.io.*;
020import java.nio.ByteBuffer;
021import java.nio.channels.FileChannel;
022import java.util.*;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.atomic.AtomicLong;
025import java.util.concurrent.atomic.AtomicReference;
026import java.util.zip.Adler32;
027import java.util.zip.Checksum;
028import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
029import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
030import org.apache.activemq.util.*;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
034import org.apache.activemq.store.kahadb.disk.util.SchedulerTimerTask;
035import org.apache.activemq.store.kahadb.disk.util.Sequence;
036
037/**
038 * Manages DataFiles
039 *
040 *
041 */
042public class Journal {
043    public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
044    public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
045
046    private static final int MAX_BATCH_SIZE = 32*1024*1024;
047
048    // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
049    public static final int RECORD_HEAD_SPACE = 4 + 1;
050
051    public static final byte USER_RECORD_TYPE = 1;
052    public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
053    // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
054    public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
055    public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
056    public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
057
058    // tackle corruption when checksum is disabled or corrupt with zeros, minimise data loss
059    public void corruptRecoveryLocation(Location recoveryPosition) throws IOException {
060        DataFile dataFile = getDataFile(recoveryPosition);
061        // with corruption on recovery we have no faith in the content - slip to the next batch record or eof
062        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
063        try {
064            int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1);
065            Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1);
066            LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
067
068            // skip corruption on getNextLocation
069            recoveryPosition.setOffset((int) sequence.getLast() + 1);
070            recoveryPosition.setSize(-1);
071
072            dataFile.corruptedBlocks.add(sequence);
073
074        } catch (IOException e) {
075        } finally {
076            accessorPool.closeDataFileAccessor(reader);
077        }
078    }
079
080    public enum PreallocationStrategy {
081        SPARSE_FILE,
082        OS_KERNEL_COPY,
083        ZEROS;
084    }
085
086    public enum PreallocationScope {
087        ENTIRE_JOURNAL;
088    }
089
090    private static byte[] createBatchControlRecordHeader() {
091        try {
092            DataByteArrayOutputStream os = new DataByteArrayOutputStream();
093            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
094            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
095            os.write(BATCH_CONTROL_RECORD_MAGIC);
096            ByteSequence sequence = os.toByteSequence();
097            sequence.compact();
098            return sequence.getData();
099        } catch (IOException e) {
100            throw new RuntimeException("Could not create batch control record header.", e);
101        }
102    }
103
104    public static final String DEFAULT_DIRECTORY = ".";
105    public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
106    public static final String DEFAULT_FILE_PREFIX = "db-";
107    public static final String DEFAULT_FILE_SUFFIX = ".log";
108    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
109    public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
110    public static final int PREFERED_DIFF = 1024 * 512;
111    public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
112
113    private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
114
115    protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
116
117    protected File directory = new File(DEFAULT_DIRECTORY);
118    protected File directoryArchive;
119    private boolean directoryArchiveOverridden = false;
120
121    protected String filePrefix = DEFAULT_FILE_PREFIX;
122    protected String fileSuffix = DEFAULT_FILE_SUFFIX;
123    protected boolean started;
124
125    protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
126    protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
127
128    protected FileAppender appender;
129    protected DataFileAccessorPool accessorPool;
130
131    protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
132    protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
133    protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
134
135    protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
136    protected Runnable cleanupTask;
137    protected AtomicLong totalLength = new AtomicLong();
138    protected boolean archiveDataLogs;
139    private ReplicationTarget replicationTarget;
140    protected boolean checksum;
141    protected boolean checkForCorruptionOnStartup;
142    protected boolean enableAsyncDiskSync = true;
143    private Timer timer;
144    private int nextDataFileId = 1;
145
146    protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
147    protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
148
149    public interface DataFileRemovedListener {
150        void fileRemoved(DataFile datafile);
151    }
152
153    private DataFileRemovedListener dataFileRemovedListener;
154
155    public synchronized void start() throws IOException {
156        if (started) {
157            return;
158        }
159
160        long start = System.currentTimeMillis();
161        accessorPool = new DataFileAccessorPool(this);
162        started = true;
163
164        appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
165
166        File[] files = directory.listFiles(new FilenameFilter() {
167            @Override
168            public boolean accept(File dir, String n) {
169                return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
170            }
171        });
172
173        if (files != null) {
174            for (File file : files) {
175                try {
176                    String n = file.getName();
177                    String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
178                    int num = Integer.parseInt(numStr);
179                    DataFile dataFile = new DataFile(file, num);
180                    fileMap.put(dataFile.getDataFileId(), dataFile);
181                    totalLength.addAndGet(dataFile.getLength());
182                } catch (NumberFormatException e) {
183                    // Ignore file that do not match the pattern.
184                }
185            }
186
187            // Sort the list so that we can link the DataFiles together in the
188            // right order.
189            List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
190            Collections.sort(l);
191            for (DataFile df : l) {
192                if (df.getLength() == 0) {
193                    // possibly the result of a previous failed write
194                    LOG.info("ignoring zero length, partially initialised journal data file: " + df);
195                    continue;
196                }
197                dataFiles.addLast(df);
198                fileByFileMap.put(df.getFile(), df);
199
200                if( isCheckForCorruptionOnStartup() ) {
201                    lastAppendLocation.set(recoveryCheck(df));
202                }
203            }
204        }
205
206        nextDataFileId = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
207
208        getOrCreateCurrentWriteFile();
209
210        if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) {
211            LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies.");
212        }
213
214        if( lastAppendLocation.get()==null ) {
215            DataFile df = dataFiles.getTail();
216            lastAppendLocation.set(recoveryCheck(df));
217        }
218
219        // ensure we don't report unused space of last journal file in size metric
220        if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) {
221            totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength);
222        }
223
224
225        cleanupTask = new Runnable() {
226            @Override
227            public void run() {
228                cleanup();
229            }
230        };
231        this.timer = new Timer("KahaDB Scheduler", true);
232        TimerTask task = new SchedulerTimerTask(cleanupTask);
233        this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL);
234        long end = System.currentTimeMillis();
235        LOG.trace("Startup took: "+(end-start)+" ms");
236    }
237
238
239    public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
240
241        if (PreallocationScope.ENTIRE_JOURNAL == preallocationScope) {
242
243            if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
244                doPreallocationKernelCopy(file);
245
246            }else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
247                doPreallocationZeros(file);
248            }
249            else {
250                doPreallocationSparseFile(file);
251            }
252        }else {
253            LOG.info("Using journal preallocation scope of batch allocation");
254        }
255    }
256
257    private void doPreallocationSparseFile(RecoverableRandomAccessFile file) {
258        try {
259            file.seek(maxFileLength - 1);
260            file.write((byte)0x00);
261        } catch (IOException e) {
262            LOG.error("Could not preallocate journal file with sparse file! Will continue without preallocation", e);
263        }
264    }
265
266    private void doPreallocationZeros(RecoverableRandomAccessFile file) {
267        ByteBuffer buffer = ByteBuffer.allocate(maxFileLength);
268        for (int i = 0; i < maxFileLength; i++) {
269            buffer.put((byte) 0x00);
270        }
271        buffer.flip();
272
273        try {
274            FileChannel channel = file.getChannel();
275            channel.write(buffer);
276            channel.force(false);
277            channel.position(0);
278        } catch (IOException e) {
279            LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e);
280        }
281    }
282
283    private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) {
284
285        // create a template file that will be used to pre-allocate the journal files
286        File templateFile = createJournalTemplateFile();
287
288        RandomAccessFile templateRaf = null;
289        try {
290            templateRaf = new RandomAccessFile(templateFile, "rw");
291            templateRaf.setLength(maxFileLength);
292            templateRaf.getChannel().force(true);
293            templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel());
294            templateRaf.close();
295            templateFile.delete();
296        } catch (FileNotFoundException e) {
297            LOG.error("Could not find the template file on disk at " + templateFile.getAbsolutePath(), e);
298        } catch (IOException e) {
299            LOG.error("Could not transfer the template file to journal, transferFile=" + templateFile.getAbsolutePath(), e);
300        }
301    }
302
303    private File createJournalTemplateFile() {
304        String fileName = "db-log.template";
305        File rc  = new File(directory, fileName);
306        if (rc.exists()) {
307            System.out.println("deleting file because it already exists...");
308            rc.delete();
309
310        }
311        return rc;
312    }
313
314
315    private static byte[] bytes(String string) {
316        try {
317            return string.getBytes("UTF-8");
318        } catch (UnsupportedEncodingException e) {
319            throw new RuntimeException(e);
320        }
321    }
322
323    protected Location recoveryCheck(DataFile dataFile) throws IOException {
324        Location location = new Location();
325        location.setDataFileId(dataFile.getDataFileId());
326        location.setOffset(0);
327
328        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
329        try {
330            while (true) {
331                int size = checkBatchRecord(reader, location.getOffset());
332                if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) {
333                    location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size);
334                } else {
335
336                    // Perhaps it's just some corruption... scan through the
337                    // file to find the next valid batch record. We
338                    // may have subsequent valid batch records.
339                    int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1);
340                    if (nextOffset >= 0) {
341                        Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
342                        LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
343                        dataFile.corruptedBlocks.add(sequence);
344                        location.setOffset(nextOffset);
345                    } else {
346                        break;
347                    }
348                }
349            }
350
351        } catch (IOException e) {
352        } finally {
353            accessorPool.closeDataFileAccessor(reader);
354        }
355
356        int existingLen = dataFile.getLength();
357        dataFile.setLength(location.getOffset());
358        if (existingLen > dataFile.getLength()) {
359            totalLength.addAndGet(dataFile.getLength() - existingLen);
360        }
361
362        if (!dataFile.corruptedBlocks.isEmpty()) {
363            // Is the end of the data file corrupted?
364            if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) {
365                dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
366            }
367        }
368
369        return location;
370    }
371
372    private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
373        ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
374        byte data[] = new byte[1024*4];
375        ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
376
377        int pos = 0;
378        while (true) {
379            pos = bs.indexOf(header, pos);
380            if (pos >= 0) {
381                return offset + pos;
382            } else {
383                // need to load the next data chunck in..
384                if (bs.length != data.length) {
385                    // If we had a short read then we were at EOF
386                    return -1;
387                }
388                offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length;
389                bs = new ByteSequence(data, 0, reader.read(offset, data));
390                pos = 0;
391            }
392        }
393    }
394
395
396    public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
397        byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
398        DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
399
400        reader.readFully(offset, controlRecord);
401
402            // Assert that it's a batch record.
403        for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
404            if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) {
405                return -1;
406            }
407        }
408
409        int size = controlIs.readInt();
410        if (size > MAX_BATCH_SIZE) {
411            return -1;
412        }
413
414        if (isChecksum()) {
415
416            long expectedChecksum = controlIs.readLong();
417            if (expectedChecksum == 0) {
418                // Checksuming was not enabled when the record was stored.
419                // we can't validate the record :(
420                return size;
421            }
422
423            byte data[] = new byte[size];
424                reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data);
425
426            Checksum checksum = new Adler32();
427            checksum.update(data, 0, data.length);
428
429            if (expectedChecksum != checksum.getValue()) {
430                return -1;
431            }
432
433        }
434        return size;
435    }
436
437
438    void addToTotalLength(int size) {
439        totalLength.addAndGet(size);
440    }
441
442    public long length() {
443        return totalLength.get();
444    }
445
446    synchronized DataFile getOrCreateCurrentWriteFile() throws IOException {
447        if (dataFiles.isEmpty()) {
448            rotateWriteFile();
449        }
450
451        DataFile current = dataFiles.getTail();
452
453        if (current != null) {
454            return current;
455        } else {
456            return rotateWriteFile();
457        }
458    }
459
460    synchronized DataFile rotateWriteFile() {
461        int nextNum = nextDataFileId++;
462        File file = getFile(nextNum);
463        DataFile nextWriteFile = new DataFile(file, nextNum);
464        fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
465        fileByFileMap.put(file, nextWriteFile);
466        dataFiles.addLast(nextWriteFile);
467        return nextWriteFile;
468    }
469
470    public synchronized DataFile reserveDataFile() {
471        int nextNum = nextDataFileId++;
472        File file = getFile(nextNum);
473        DataFile reservedDataFile = new DataFile(file, nextNum);
474        fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
475        fileByFileMap.put(file, reservedDataFile);
476        if (dataFiles.isEmpty()) {
477            dataFiles.addLast(reservedDataFile);
478        } else {
479            dataFiles.getTail().linkBefore(reservedDataFile);
480        }
481        return reservedDataFile;
482    }
483
484    public File getFile(int nextNum) {
485        String fileName = filePrefix + nextNum + fileSuffix;
486        File file = new File(directory, fileName);
487        return file;
488    }
489
490    synchronized DataFile getDataFile(Location item) throws IOException {
491        Integer key = Integer.valueOf(item.getDataFileId());
492        DataFile dataFile = fileMap.get(key);
493        if (dataFile == null) {
494            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
495            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
496        }
497        return dataFile;
498    }
499
500    synchronized File getFile(Location item) throws IOException {
501        Integer key = Integer.valueOf(item.getDataFileId());
502        DataFile dataFile = fileMap.get(key);
503        if (dataFile == null) {
504            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
505            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
506        }
507        return dataFile.getFile();
508    }
509
510    public void close() throws IOException {
511        synchronized (this) {
512            if (!started) {
513                return;
514            }
515            if (this.timer != null) {
516                this.timer.cancel();
517            }
518            accessorPool.close();
519        }
520        // the appender can be calling back to to the journal blocking a close AMQ-5620
521        appender.close();
522        synchronized (this) {
523            fileMap.clear();
524            fileByFileMap.clear();
525            dataFiles.clear();
526            lastAppendLocation.set(null);
527            started = false;
528        }
529    }
530
531    protected synchronized void cleanup() {
532        if (accessorPool != null) {
533            accessorPool.disposeUnused();
534        }
535    }
536
537    public synchronized boolean delete() throws IOException {
538
539        // Close all open file handles...
540        appender.close();
541        accessorPool.close();
542
543        boolean result = true;
544        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
545            DataFile dataFile = i.next();
546            result &= dataFile.delete();
547        }
548
549        totalLength.set(0);
550        fileMap.clear();
551        fileByFileMap.clear();
552        lastAppendLocation.set(null);
553        dataFiles = new LinkedNodeList<DataFile>();
554
555        // reopen open file handles...
556        accessorPool = new DataFileAccessorPool(this);
557        appender = new DataFileAppender(this);
558        return result;
559    }
560
561    public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
562        for (Integer key : files) {
563            // Can't remove the data file (or subsequent files) that is currently being written to.
564            if (key >= lastAppendLocation.get().getDataFileId()) {
565                continue;
566            }
567            DataFile dataFile = fileMap.get(key);
568            if (dataFile != null) {
569                forceRemoveDataFile(dataFile);
570            }
571        }
572    }
573
574    private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
575        accessorPool.disposeDataFileAccessors(dataFile);
576        fileByFileMap.remove(dataFile.getFile());
577        fileMap.remove(dataFile.getDataFileId());
578        totalLength.addAndGet(-dataFile.getLength());
579        dataFile.unlink();
580        if (archiveDataLogs) {
581            File directoryArchive = getDirectoryArchive();
582            if (directoryArchive.exists()) {
583                LOG.debug("Archive directory exists: {}", directoryArchive);
584            } else {
585                if (directoryArchive.isAbsolute())
586                if (LOG.isDebugEnabled()) {
587                    LOG.debug("Archive directory [{}] does not exist - creating it now",
588                            directoryArchive.getAbsolutePath());
589                }
590                IOHelper.mkdirs(directoryArchive);
591            }
592            LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath());
593            dataFile.move(directoryArchive);
594            LOG.debug("Successfully moved data file");
595        } else {
596            LOG.debug("Deleting data file: {}", dataFile);
597            if (dataFile.delete()) {
598                LOG.debug("Discarded data file: {}", dataFile);
599            } else {
600                LOG.warn("Failed to discard data file : {}", dataFile.getFile());
601            }
602        }
603        if (dataFileRemovedListener != null) {
604            dataFileRemovedListener.fileRemoved(dataFile);
605        }
606    }
607
608    /**
609     * @return the maxFileLength
610     */
611    public int getMaxFileLength() {
612        return maxFileLength;
613    }
614
615    /**
616     * @param maxFileLength the maxFileLength to set
617     */
618    public void setMaxFileLength(int maxFileLength) {
619        this.maxFileLength = maxFileLength;
620    }
621
622    @Override
623    public String toString() {
624        return directory.toString();
625    }
626
627    public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
628
629        Location cur = null;
630        while (true) {
631            if (cur == null) {
632                if (location == null) {
633                    DataFile head = dataFiles.getHead();
634                    if (head == null) {
635                        return null;
636                    }
637                    cur = new Location();
638                    cur.setDataFileId(head.getDataFileId());
639                    cur.setOffset(0);
640                } else {
641                    // Set to the next offset..
642                    if (location.getSize() == -1) {
643                        cur = new Location(location);
644                    } else {
645                        cur = new Location(location);
646                        cur.setOffset(location.getOffset() + location.getSize());
647                    }
648                }
649            } else {
650                cur.setOffset(cur.getOffset() + cur.getSize());
651            }
652
653            DataFile dataFile = getDataFile(cur);
654
655            // Did it go into the next file??
656            if (dataFile.getLength() <= cur.getOffset()) {
657                dataFile = dataFile.getNext();
658                if (dataFile == null) {
659                    return null;
660                } else {
661                    cur.setDataFileId(dataFile.getDataFileId().intValue());
662                    cur.setOffset(0);
663                }
664            }
665
666            // Load in location size and type.
667            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
668            try {
669                reader.readLocationDetails(cur);
670            } finally {
671                accessorPool.closeDataFileAccessor(reader);
672            }
673
674            Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset());
675            if (corruptedRange != null) {
676                // skip corruption
677                cur.setSize((int) corruptedRange.range());
678            } else if (cur.getType() == 0) {
679                // eof - jump to next datafile
680                cur.setOffset(maxFileLength);
681            } else if (cur.getType() == USER_RECORD_TYPE) {
682                // Only return user records.
683                return cur;
684            }
685        }
686    }
687
688    public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
689        DataFile dataFile = getDataFile(location);
690        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
691        ByteSequence rc = null;
692        try {
693            rc = reader.readRecord(location);
694        } finally {
695            accessorPool.closeDataFileAccessor(reader);
696        }
697        return rc;
698    }
699
700    public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
701        Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
702        return loc;
703    }
704
705    public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
706        Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
707        return loc;
708    }
709
710    public void update(Location location, ByteSequence data, boolean sync) throws IOException {
711        DataFile dataFile = getDataFile(location);
712        DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
713        try {
714            updater.updateRecord(location, data, sync);
715        } finally {
716            accessorPool.closeDataFileAccessor(updater);
717        }
718    }
719
720    public PreallocationStrategy getPreallocationStrategy() {
721        return preallocationStrategy;
722    }
723
724    public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) {
725        this.preallocationStrategy = preallocationStrategy;
726    }
727
728    public PreallocationScope getPreallocationScope() {
729        return preallocationScope;
730    }
731
732    public void setPreallocationScope(PreallocationScope preallocationScope) {
733        this.preallocationScope = preallocationScope;
734    }
735
736    public File getDirectory() {
737        return directory;
738    }
739
740    public void setDirectory(File directory) {
741        this.directory = directory;
742    }
743
744    public String getFilePrefix() {
745        return filePrefix;
746    }
747
748    public void setFilePrefix(String filePrefix) {
749        this.filePrefix = filePrefix;
750    }
751
752    public Map<WriteKey, WriteCommand> getInflightWrites() {
753        return inflightWrites;
754    }
755
756    public Location getLastAppendLocation() {
757        return lastAppendLocation.get();
758    }
759
760    public void setLastAppendLocation(Location lastSyncedLocation) {
761        this.lastAppendLocation.set(lastSyncedLocation);
762    }
763
764    public File getDirectoryArchive() {
765        if (!directoryArchiveOverridden && (directoryArchive == null)) {
766            // create the directoryArchive relative to the journal location
767            directoryArchive = new File(directory.getAbsolutePath() +
768                    File.separator + DEFAULT_ARCHIVE_DIRECTORY);
769        }
770        return directoryArchive;
771    }
772
773    public void setDirectoryArchive(File directoryArchive) {
774        directoryArchiveOverridden = true;
775        this.directoryArchive = directoryArchive;
776    }
777
778    public boolean isArchiveDataLogs() {
779        return archiveDataLogs;
780    }
781
782    public void setArchiveDataLogs(boolean archiveDataLogs) {
783        this.archiveDataLogs = archiveDataLogs;
784    }
785
786    public synchronized DataFile getDataFileById(int dataFileId) {
787        if (dataFiles.isEmpty()) {
788            return null;
789        }
790
791        return fileMap.get(Integer.valueOf(dataFileId));
792    }
793
794    public synchronized DataFile getCurrentDataFile() {
795        if (dataFiles.isEmpty()) {
796            return null;
797        }
798
799        DataFile current = dataFiles.getTail();
800
801        if (current != null) {
802            return current;
803        } else {
804            return null;
805        }
806    }
807
808    public synchronized Integer getCurrentDataFileId() {
809        DataFile current = getCurrentDataFile();
810        if (current != null) {
811            return current.getDataFileId();
812        } else {
813            return null;
814        }
815    }
816
817    /**
818     * Get a set of files - only valid after start()
819     *
820     * @return files currently being used
821     */
822    public Set<File> getFiles() {
823        return fileByFileMap.keySet();
824    }
825
826    public synchronized Map<Integer, DataFile> getFileMap() {
827        return new TreeMap<Integer, DataFile>(fileMap);
828    }
829
830    public long getDiskSize() {
831        return totalLength.get();
832    }
833
834    public void setReplicationTarget(ReplicationTarget replicationTarget) {
835        this.replicationTarget = replicationTarget;
836    }
837    public ReplicationTarget getReplicationTarget() {
838        return replicationTarget;
839    }
840
841    public String getFileSuffix() {
842        return fileSuffix;
843    }
844
845    public void setFileSuffix(String fileSuffix) {
846        this.fileSuffix = fileSuffix;
847    }
848
849    public boolean isChecksum() {
850        return checksum;
851    }
852
853    public void setChecksum(boolean checksumWrites) {
854        this.checksum = checksumWrites;
855    }
856
857    public boolean isCheckForCorruptionOnStartup() {
858        return checkForCorruptionOnStartup;
859    }
860
861    public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
862        this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
863    }
864
865    public void setWriteBatchSize(int writeBatchSize) {
866        this.writeBatchSize = writeBatchSize;
867    }
868
869    public int getWriteBatchSize() {
870        return writeBatchSize;
871    }
872
873    public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
874       this.totalLength = storeSizeAccumulator;
875    }
876
877    public void setEnableAsyncDiskSync(boolean val) {
878        this.enableAsyncDiskSync = val;
879    }
880
881    public boolean isEnableAsyncDiskSync() {
882        return enableAsyncDiskSync;
883    }
884
885    public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) {
886        this.dataFileRemovedListener = dataFileRemovedListener;
887    }
888
889    public static class WriteCommand extends LinkedNode<WriteCommand> {
890        public final Location location;
891        public final ByteSequence data;
892        final boolean sync;
893        public final Runnable onComplete;
894
895        public WriteCommand(Location location, ByteSequence data, boolean sync) {
896            this.location = location;
897            this.data = data;
898            this.sync = sync;
899            this.onComplete = null;
900        }
901
902        public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
903            this.location = location;
904            this.data = data;
905            this.onComplete = onComplete;
906            this.sync = false;
907        }
908    }
909
910    public static class WriteKey {
911        private final int file;
912        private final long offset;
913        private final int hash;
914
915        public WriteKey(Location item) {
916            file = item.getDataFileId();
917            offset = item.getOffset();
918            // TODO: see if we can build a better hash
919            hash = (int)(file ^ offset);
920        }
921
922        @Override
923        public int hashCode() {
924            return hash;
925        }
926
927        @Override
928        public boolean equals(Object obj) {
929            if (obj instanceof WriteKey) {
930                WriteKey di = (WriteKey)obj;
931                return di.file == file && di.offset == offset;
932            }
933            return false;
934        }
935    }
936}