001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.journal;
018
019import java.io.*;
020import java.nio.ByteBuffer;
021import java.nio.channels.FileChannel;
022import java.util.*;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.atomic.AtomicLong;
025import java.util.concurrent.atomic.AtomicReference;
026import java.util.zip.Adler32;
027import java.util.zip.Checksum;
028import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
029import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
030import org.apache.activemq.util.*;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
034import org.apache.activemq.store.kahadb.disk.util.SchedulerTimerTask;
035import org.apache.activemq.store.kahadb.disk.util.Sequence;
036
037/**
038 * Manages DataFiles
039 *
040 *
041 */
042public class Journal {
043    public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
044    public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
045
046    private static final int MAX_BATCH_SIZE = 32*1024*1024;
047
048    // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
049    public static final int RECORD_HEAD_SPACE = 4 + 1;
050
051    public static final byte USER_RECORD_TYPE = 1;
052    public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
053    // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
054    public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
055    public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
056    public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
057
058    // tackle corruption when checksum is disabled or corrupt with zeros, minimise data loss
059    public void corruptRecoveryLocation(Location recoveryPosition) throws IOException {
060        DataFile dataFile = getDataFile(recoveryPosition);
061        // with corruption on recovery we have no faith in the content - slip to the next batch record or eof
062        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
063        try {
064            int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1);
065            Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1);
066            LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
067
068            // skip corruption on getNextLocation
069            recoveryPosition.setOffset((int) sequence.getLast() + 1);
070            recoveryPosition.setSize(-1);
071
072            dataFile.corruptedBlocks.add(sequence);
073
074        } catch (IOException e) {
075        } finally {
076            accessorPool.closeDataFileAccessor(reader);
077        }
078    }
079
080    public enum PreallocationStrategy {
081        SPARSE_FILE,
082        OS_KERNEL_COPY,
083        ZEROS;
084    }
085
086    public enum PreallocationScope {
087        ENTIRE_JOURNAL;
088    }
089
090    private static byte[] createBatchControlRecordHeader() {
091        try {
092            DataByteArrayOutputStream os = new DataByteArrayOutputStream();
093            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
094            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
095            os.write(BATCH_CONTROL_RECORD_MAGIC);
096            ByteSequence sequence = os.toByteSequence();
097            sequence.compact();
098            return sequence.getData();
099        } catch (IOException e) {
100            throw new RuntimeException("Could not create batch control record header.", e);
101        }
102    }
103
104    public static final String DEFAULT_DIRECTORY = ".";
105    public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
106    public static final String DEFAULT_FILE_PREFIX = "db-";
107    public static final String DEFAULT_FILE_SUFFIX = ".log";
108    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
109    public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
110    public static final int PREFERED_DIFF = 1024 * 512;
111    public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
112
113    private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
114
115    protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
116
117    protected File directory = new File(DEFAULT_DIRECTORY);
118    protected File directoryArchive;
119    private boolean directoryArchiveOverridden = false;
120
121    protected String filePrefix = DEFAULT_FILE_PREFIX;
122    protected String fileSuffix = DEFAULT_FILE_SUFFIX;
123    protected boolean started;
124
125    protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
126    protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
127
128    protected FileAppender appender;
129    protected DataFileAccessorPool accessorPool;
130
131    protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
132    protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
133    protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
134
135    protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
136    protected Runnable cleanupTask;
137    protected AtomicLong totalLength = new AtomicLong();
138    protected boolean archiveDataLogs;
139    private ReplicationTarget replicationTarget;
140    protected boolean checksum;
141    protected boolean checkForCorruptionOnStartup;
142    protected boolean enableAsyncDiskSync = true;
143    private Timer timer;
144
145    protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
146    protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
147
148    public interface DataFileRemovedListener {
149        void fileRemoved(DataFile datafile);
150    }
151
152    private DataFileRemovedListener dataFileRemovedListener;
153
154    public synchronized void start() throws IOException {
155        if (started) {
156            return;
157        }
158
159        long start = System.currentTimeMillis();
160        accessorPool = new DataFileAccessorPool(this);
161        started = true;
162
163        appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
164
165        File[] files = directory.listFiles(new FilenameFilter() {
166            public boolean accept(File dir, String n) {
167                return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
168            }
169        });
170
171        if (files != null) {
172            for (File file : files) {
173                try {
174                    String n = file.getName();
175                    String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
176                    int num = Integer.parseInt(numStr);
177                    DataFile dataFile = new DataFile(file, num);
178                    fileMap.put(dataFile.getDataFileId(), dataFile);
179                    totalLength.addAndGet(dataFile.getLength());
180                } catch (NumberFormatException e) {
181                    // Ignore file that do not match the pattern.
182                }
183            }
184
185            // Sort the list so that we can link the DataFiles together in the
186            // right order.
187            List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
188            Collections.sort(l);
189            for (DataFile df : l) {
190                if (df.getLength() == 0) {
191                    // possibly the result of a previous failed write
192                    LOG.info("ignoring zero length, partially initialised journal data file: " + df);
193                    continue;
194                }
195                dataFiles.addLast(df);
196                fileByFileMap.put(df.getFile(), df);
197
198                if( isCheckForCorruptionOnStartup() ) {
199                    lastAppendLocation.set(recoveryCheck(df));
200                }
201            }
202        }
203
204        getCurrentWriteFile();
205
206        if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) {
207            LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies.");
208        }
209
210        if( lastAppendLocation.get()==null ) {
211            DataFile df = dataFiles.getTail();
212            lastAppendLocation.set(recoveryCheck(df));
213        }
214
215        // ensure we don't report unused space of last journal file in size metric
216        if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) {
217            totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength);
218        }
219
220
221        cleanupTask = new Runnable() {
222            public void run() {
223                cleanup();
224            }
225        };
226        this.timer = new Timer("KahaDB Scheduler", true);
227        TimerTask task = new SchedulerTimerTask(cleanupTask);
228        this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL);
229        long end = System.currentTimeMillis();
230        LOG.trace("Startup took: "+(end-start)+" ms");
231    }
232
233
234    public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
235
236        if (PreallocationScope.ENTIRE_JOURNAL == preallocationScope) {
237
238            if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
239                doPreallocationKernelCopy(file);
240
241            }else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
242                doPreallocationZeros(file);
243            }
244            else {
245                doPreallocationSparseFile(file);
246            }
247        }else {
248            LOG.info("Using journal preallocation scope of batch allocation");
249        }
250    }
251
252    private void doPreallocationSparseFile(RecoverableRandomAccessFile file) {
253        try {
254            file.seek(maxFileLength - 1);
255            file.write((byte)0x00);
256        } catch (IOException e) {
257            LOG.error("Could not preallocate journal file with sparse file! Will continue without preallocation", e);
258        }
259    }
260
261    private void doPreallocationZeros(RecoverableRandomAccessFile file) {
262        ByteBuffer buffer = ByteBuffer.allocate(maxFileLength);
263        for (int i = 0; i < maxFileLength; i++) {
264            buffer.put((byte) 0x00);
265        }
266        buffer.flip();
267
268        try {
269            FileChannel channel = file.getChannel();
270            channel.write(buffer);
271            channel.force(false);
272            channel.position(0);
273        } catch (IOException e) {
274            LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e);
275        }
276    }
277
278    private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) {
279
280        // create a template file that will be used to pre-allocate the journal files
281        File templateFile = createJournalTemplateFile();
282
283        RandomAccessFile templateRaf = null;
284        try {
285            templateRaf = new RandomAccessFile(templateFile, "rw");
286            templateRaf.setLength(maxFileLength);
287            templateRaf.getChannel().force(true);
288            templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel());
289            templateRaf.close();
290            templateFile.delete();
291        } catch (FileNotFoundException e) {
292            LOG.error("Could not find the template file on disk at " + templateFile.getAbsolutePath(), e);
293        } catch (IOException e) {
294            LOG.error("Could not transfer the template file to journal, transferFile=" + templateFile.getAbsolutePath(), e);
295        }
296    }
297
298    private File createJournalTemplateFile() {
299        String fileName = "db-log.template";
300        File rc  = new File(directory, fileName);
301        if (rc.exists()) {
302            System.out.println("deleting file because it already exists...");
303            rc.delete();
304
305        }
306        return rc;
307    }
308
309    private static byte[] bytes(String string) {
310        try {
311            return string.getBytes("UTF-8");
312        } catch (UnsupportedEncodingException e) {
313            throw new RuntimeException(e);
314        }
315    }
316
317    protected Location recoveryCheck(DataFile dataFile) throws IOException {
318        Location location = new Location();
319        location.setDataFileId(dataFile.getDataFileId());
320        location.setOffset(0);
321
322        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
323        try {
324            while( true ) {
325                int size = checkBatchRecord(reader, location.getOffset());
326                if ( size>=0 && location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size <= dataFile.getLength()) {
327                    location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
328                } else {
329
330                    // Perhaps it's just some corruption... scan through the file to find the next valid batch record.  We
331                    // may have subsequent valid batch records.
332                    int nextOffset = findNextBatchRecord(reader, location.getOffset()+1);
333                    if( nextOffset >=0 ) {
334                        Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
335                        LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
336                        dataFile.corruptedBlocks.add(sequence);
337                        location.setOffset(nextOffset);
338                    } else {
339                        break;
340                    }
341                }
342            }
343
344        } catch (IOException e) {
345        } finally {
346            accessorPool.closeDataFileAccessor(reader);
347        }
348
349        int existingLen = dataFile.getLength();
350        dataFile.setLength(location.getOffset());
351        if (existingLen > dataFile.getLength()) {
352            totalLength.addAndGet(dataFile.getLength() - existingLen);
353        }
354
355        if( !dataFile.corruptedBlocks.isEmpty() ) {
356            // Is the end of the data file corrupted?
357            if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) {
358                dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
359            }
360        }
361
362        return location;
363    }
364
365    private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
366        ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
367        byte data[] = new byte[1024*4];
368        ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
369
370        int pos = 0;
371        while( true ) {
372            pos = bs.indexOf(header, pos);
373            if( pos >= 0 ) {
374                return offset+pos;
375            } else {
376                // need to load the next data chunck in..
377                if( bs.length != data.length ) {
378                    // If we had a short read then we were at EOF
379                    return -1;
380                }
381                offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length;
382                bs = new ByteSequence(data, 0, reader.read(offset, data));
383                pos=0;
384            }
385        }
386    }
387
388
389    public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
390        byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
391        DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
392
393        reader.readFully(offset, controlRecord);
394
395        // Assert that it's  a batch record.
396        for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
397            if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
398                return -1;
399            }
400        }
401
402        int size = controlIs.readInt();
403        if( size > MAX_BATCH_SIZE ) {
404            return -1;
405        }
406
407        if( isChecksum() ) {
408
409            long expectedChecksum = controlIs.readLong();
410            if( expectedChecksum == 0 ) {
411                // Checksuming was not enabled when the record was stored.
412                // we can't validate the record :(
413                return size;
414            }
415
416            byte data[] = new byte[size];
417            reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
418
419            Checksum checksum = new Adler32();
420            checksum.update(data, 0, data.length);
421
422            if( expectedChecksum!=checksum.getValue() ) {
423                return -1;
424            }
425
426        }
427        return size;
428    }
429
430
431    void addToTotalLength(int size) {
432        totalLength.addAndGet(size);
433    }
434
435    public long length() {
436        return totalLength.get();
437    }
438
439    synchronized DataFile getCurrentWriteFile() throws IOException {
440        if (dataFiles.isEmpty()) {
441            rotateWriteFile();
442        }
443        return dataFiles.getTail();
444    }
445
446    synchronized DataFile rotateWriteFile() {
447        int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
448        File file = getFile(nextNum);
449        DataFile nextWriteFile = new DataFile(file, nextNum);
450        fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
451        fileByFileMap.put(file, nextWriteFile);
452        dataFiles.addLast(nextWriteFile);
453        return nextWriteFile;
454    }
455
456    public File getFile(int nextNum) {
457        String fileName = filePrefix + nextNum + fileSuffix;
458        File file = new File(directory, fileName);
459        return file;
460    }
461
462    synchronized DataFile getDataFile(Location item) throws IOException {
463        Integer key = Integer.valueOf(item.getDataFileId());
464        DataFile dataFile = fileMap.get(key);
465        if (dataFile == null) {
466            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
467            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
468        }
469        return dataFile;
470    }
471
472    synchronized File getFile(Location item) throws IOException {
473        Integer key = Integer.valueOf(item.getDataFileId());
474        DataFile dataFile = fileMap.get(key);
475        if (dataFile == null) {
476            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
477            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
478        }
479        return dataFile.getFile();
480    }
481
482    private DataFile getNextDataFile(DataFile dataFile) {
483        return dataFile.getNext();
484    }
485
486    public void close() throws IOException {
487        synchronized (this) {
488            if (!started) {
489                return;
490            }
491            if (this.timer != null) {
492                this.timer.cancel();
493            }
494            accessorPool.close();
495        }
496        // the appender can be calling back to to the journal blocking a close AMQ-5620
497        appender.close();
498        synchronized (this) {
499            fileMap.clear();
500            fileByFileMap.clear();
501            dataFiles.clear();
502            lastAppendLocation.set(null);
503            started = false;
504        }
505    }
506
507    protected synchronized void cleanup() {
508        if (accessorPool != null) {
509            accessorPool.disposeUnused();
510        }
511    }
512
513    public synchronized boolean delete() throws IOException {
514
515        // Close all open file handles...
516        appender.close();
517        accessorPool.close();
518
519        boolean result = true;
520        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
521            DataFile dataFile = i.next();
522            result &= dataFile.delete();
523        }
524        totalLength.set(0);
525        fileMap.clear();
526        fileByFileMap.clear();
527        lastAppendLocation.set(null);
528        dataFiles = new LinkedNodeList<DataFile>();
529
530        // reopen open file handles...
531        accessorPool = new DataFileAccessorPool(this);
532        appender = new DataFileAppender(this);
533        return result;
534    }
535
536    public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
537        for (Integer key : files) {
538            // Can't remove the data file (or subsequent files) that is currently being written to.
539            if( key >= lastAppendLocation.get().getDataFileId() ) {
540                continue;
541            }
542            DataFile dataFile = fileMap.get(key);
543            if( dataFile!=null ) {
544                forceRemoveDataFile(dataFile);
545            }
546        }
547    }
548
549    private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
550        accessorPool.disposeDataFileAccessors(dataFile);
551        fileByFileMap.remove(dataFile.getFile());
552        fileMap.remove(dataFile.getDataFileId());
553        totalLength.addAndGet(-dataFile.getLength());
554        dataFile.unlink();
555        if (archiveDataLogs) {
556            File directoryArchive = getDirectoryArchive();
557            if (directoryArchive.exists()) {
558                LOG.debug("Archive directory exists: {}", directoryArchive);
559            } else {
560                if (directoryArchive.isAbsolute())
561                if (LOG.isDebugEnabled()) {
562                    LOG.debug("Archive directory [{}] does not exist - creating it now",
563                            directoryArchive.getAbsolutePath());
564                }
565                IOHelper.mkdirs(directoryArchive);
566            }
567            LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath());
568            dataFile.move(directoryArchive);
569            LOG.debug("Successfully moved data file");
570        } else {
571            LOG.debug("Deleting data file: {}", dataFile);
572            if ( dataFile.delete() ) {
573                LOG.debug("Discarded data file: {}", dataFile);
574            } else {
575                LOG.warn("Failed to discard data file : {}", dataFile.getFile());
576            }
577        }
578        if (dataFileRemovedListener != null) {
579            dataFileRemovedListener.fileRemoved(dataFile);
580        }
581    }
582
583    /**
584     * @return the maxFileLength
585     */
586    public int getMaxFileLength() {
587        return maxFileLength;
588    }
589
590    /**
591     * @param maxFileLength the maxFileLength to set
592     */
593    public void setMaxFileLength(int maxFileLength) {
594        this.maxFileLength = maxFileLength;
595    }
596
597    @Override
598    public String toString() {
599        return directory.toString();
600    }
601
602    public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
603
604        Location cur = null;
605        while (true) {
606            if (cur == null) {
607                if (location == null) {
608                    DataFile head = dataFiles.getHead();
609                    if( head == null ) {
610                        return null;
611                    }
612                    cur = new Location();
613                    cur.setDataFileId(head.getDataFileId());
614                    cur.setOffset(0);
615                } else {
616                    // Set to the next offset..
617                    if (location.getSize() == -1) {
618                        cur = new Location(location);
619                    } else {
620                        cur = new Location(location);
621                        cur.setOffset(location.getOffset() + location.getSize());
622                    }
623                }
624            } else {
625                cur.setOffset(cur.getOffset() + cur.getSize());
626            }
627
628            DataFile dataFile = getDataFile(cur);
629
630            // Did it go into the next file??
631            if (dataFile.getLength() <= cur.getOffset()) {
632                dataFile = getNextDataFile(dataFile);
633                if (dataFile == null) {
634                    return null;
635                } else {
636                    cur.setDataFileId(dataFile.getDataFileId().intValue());
637                    cur.setOffset(0);
638                }
639            }
640
641            // Load in location size and type.
642            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
643            try {
644                reader.readLocationDetails(cur);
645            } finally {
646                accessorPool.closeDataFileAccessor(reader);
647            }
648
649            Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset());
650            if (corruptedRange != null) {
651                // skip corruption
652                cur.setSize((int) corruptedRange.range());
653            } else if (cur.getType() == 0) {
654                // eof - jump to next datafile
655                cur.setOffset(maxFileLength);
656            } else if (cur.getType() == USER_RECORD_TYPE) {
657                // Only return user records.
658                return cur;
659            }
660        }
661    }
662
663    public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
664        DataFile dataFile = getDataFile(location);
665        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
666        ByteSequence rc = null;
667        try {
668            rc = reader.readRecord(location);
669        } finally {
670            accessorPool.closeDataFileAccessor(reader);
671        }
672        return rc;
673    }
674
675    public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
676        Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
677        return loc;
678    }
679
680    public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
681        Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
682        return loc;
683    }
684
685    public void update(Location location, ByteSequence data, boolean sync) throws IOException {
686        DataFile dataFile = getDataFile(location);
687        DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
688        try {
689            updater.updateRecord(location, data, sync);
690        } finally {
691            accessorPool.closeDataFileAccessor(updater);
692        }
693    }
694
695    public PreallocationStrategy getPreallocationStrategy() {
696        return preallocationStrategy;
697    }
698
699    public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) {
700        this.preallocationStrategy = preallocationStrategy;
701    }
702
703    public PreallocationScope getPreallocationScope() {
704        return preallocationScope;
705    }
706
707    public void setPreallocationScope(PreallocationScope preallocationScope) {
708        this.preallocationScope = preallocationScope;
709    }
710
711    public File getDirectory() {
712        return directory;
713    }
714
715    public void setDirectory(File directory) {
716        this.directory = directory;
717    }
718
719    public String getFilePrefix() {
720        return filePrefix;
721    }
722
723    public void setFilePrefix(String filePrefix) {
724        this.filePrefix = filePrefix;
725    }
726
727    public Map<WriteKey, WriteCommand> getInflightWrites() {
728        return inflightWrites;
729    }
730
731    public Location getLastAppendLocation() {
732        return lastAppendLocation.get();
733    }
734
735    public void setLastAppendLocation(Location lastSyncedLocation) {
736        this.lastAppendLocation.set(lastSyncedLocation);
737    }
738
739    public File getDirectoryArchive() {
740        if (!directoryArchiveOverridden && (directoryArchive == null)) {
741            // create the directoryArchive relative to the journal location
742            directoryArchive = new File(directory.getAbsolutePath() +
743                    File.separator + DEFAULT_ARCHIVE_DIRECTORY);
744        }
745        return directoryArchive;
746    }
747
748    public void setDirectoryArchive(File directoryArchive) {
749        directoryArchiveOverridden = true;
750        this.directoryArchive = directoryArchive;
751    }
752
753    public boolean isArchiveDataLogs() {
754        return archiveDataLogs;
755    }
756
757    public void setArchiveDataLogs(boolean archiveDataLogs) {
758        this.archiveDataLogs = archiveDataLogs;
759    }
760
761    synchronized public Integer getCurrentDataFileId() {
762        if (dataFiles.isEmpty())
763            return null;
764        return dataFiles.getTail().getDataFileId();
765    }
766
767    /**
768     * Get a set of files - only valid after start()
769     *
770     * @return files currently being used
771     */
772    public Set<File> getFiles() {
773        return fileByFileMap.keySet();
774    }
775
776    public synchronized Map<Integer, DataFile> getFileMap() {
777        return new TreeMap<Integer, DataFile>(fileMap);
778    }
779
780    public long getDiskSize() {
781        return totalLength.get();
782    }
783
784    public void setReplicationTarget(ReplicationTarget replicationTarget) {
785        this.replicationTarget = replicationTarget;
786    }
787    public ReplicationTarget getReplicationTarget() {
788        return replicationTarget;
789    }
790
791    public String getFileSuffix() {
792        return fileSuffix;
793    }
794
795    public void setFileSuffix(String fileSuffix) {
796        this.fileSuffix = fileSuffix;
797    }
798
799    public boolean isChecksum() {
800        return checksum;
801    }
802
803    public void setChecksum(boolean checksumWrites) {
804        this.checksum = checksumWrites;
805    }
806
807    public boolean isCheckForCorruptionOnStartup() {
808        return checkForCorruptionOnStartup;
809    }
810
811    public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
812        this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
813    }
814
815    public void setWriteBatchSize(int writeBatchSize) {
816        this.writeBatchSize = writeBatchSize;
817    }
818
819    public int getWriteBatchSize() {
820        return writeBatchSize;
821    }
822
823    public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
824       this.totalLength = storeSizeAccumulator;
825    }
826
827    public void setEnableAsyncDiskSync(boolean val) {
828        this.enableAsyncDiskSync = val;
829    }
830
831    public boolean isEnableAsyncDiskSync() {
832        return enableAsyncDiskSync;
833    }
834
835    public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) {
836        this.dataFileRemovedListener = dataFileRemovedListener;
837    }
838
839    public static class WriteCommand extends LinkedNode<WriteCommand> {
840        public final Location location;
841        public final ByteSequence data;
842        final boolean sync;
843        public final Runnable onComplete;
844
845        public WriteCommand(Location location, ByteSequence data, boolean sync) {
846            this.location = location;
847            this.data = data;
848            this.sync = sync;
849            this.onComplete = null;
850        }
851
852        public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
853            this.location = location;
854            this.data = data;
855            this.onComplete = onComplete;
856            this.sync = false;
857        }
858    }
859
860    public static class WriteKey {
861        private final int file;
862        private final long offset;
863        private final int hash;
864
865        public WriteKey(Location item) {
866            file = item.getDataFileId();
867            offset = item.getOffset();
868            // TODO: see if we can build a better hash
869            hash = (int)(file ^ offset);
870        }
871
872        public int hashCode() {
873            return hash;
874        }
875
876        public boolean equals(Object obj) {
877            if (obj instanceof WriteKey) {
878                WriteKey di = (WriteKey)obj;
879                return di.file == file && di.offset == offset;
880            }
881            return false;
882        }
883    }
884}