001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.disk.journal;
018    
019    import java.io.File;
020    import java.io.FilenameFilter;
021    import java.io.IOException;
022    import java.io.UnsupportedEncodingException;
023    import java.util.ArrayList;
024    import java.util.Collections;
025    import java.util.HashMap;
026    import java.util.Iterator;
027    import java.util.LinkedHashMap;
028    import java.util.List;
029    import java.util.Map;
030    import java.util.Set;
031    import java.util.Timer;
032    import java.util.TimerTask;
033    import java.util.TreeMap;
034    import java.util.concurrent.ConcurrentHashMap;
035    import java.util.concurrent.atomic.AtomicLong;
036    import java.util.concurrent.atomic.AtomicReference;
037    import java.util.zip.Adler32;
038    import java.util.zip.Checksum;
039    import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    import org.apache.activemq.util.ByteSequence;
043    import org.apache.activemq.util.DataByteArrayInputStream;
044    import org.apache.activemq.util.DataByteArrayOutputStream;
045    import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
046    import org.apache.activemq.store.kahadb.disk.util.SchedulerTimerTask;
047    import org.apache.activemq.store.kahadb.disk.util.Sequence;
048    
049    /**
050     * Manages DataFiles
051     *
052     *
053     */
054    public class Journal {
055        public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
056        public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
057    
058        private static final int MAX_BATCH_SIZE = 32*1024*1024;
059    
060        // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
061        public static final int RECORD_HEAD_SPACE = 4 + 1;
062    
063        public static final byte USER_RECORD_TYPE = 1;
064        public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
065        // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
066        public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
067        public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
068        public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
069    
070        private static byte[] createBatchControlRecordHeader() {
071            try {
072                DataByteArrayOutputStream os = new DataByteArrayOutputStream();
073                os.writeInt(BATCH_CONTROL_RECORD_SIZE);
074                os.writeByte(BATCH_CONTROL_RECORD_TYPE);
075                os.write(BATCH_CONTROL_RECORD_MAGIC);
076                ByteSequence sequence = os.toByteSequence();
077                sequence.compact();
078                return sequence.getData();
079            } catch (IOException e) {
080                throw new RuntimeException("Could not create batch control record header.", e);
081            }
082        }
083    
084        public static final String DEFAULT_DIRECTORY = ".";
085        public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
086        public static final String DEFAULT_FILE_PREFIX = "db-";
087        public static final String DEFAULT_FILE_SUFFIX = ".log";
088        public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
089        public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
090        public static final int PREFERED_DIFF = 1024 * 512;
091        public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
092    
093        private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
094    
095        protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
096    
097        protected File directory = new File(DEFAULT_DIRECTORY);
098        protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY);
099        protected String filePrefix = DEFAULT_FILE_PREFIX;
100        protected String fileSuffix = DEFAULT_FILE_SUFFIX;
101        protected boolean started;
102    
103        protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
104        protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
105        protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
106    
107        protected FileAppender appender;
108        protected DataFileAccessorPool accessorPool;
109    
110        protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
111        protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
112        protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
113    
114        protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
115        protected Runnable cleanupTask;
116        protected AtomicLong totalLength = new AtomicLong();
117        protected boolean archiveDataLogs;
118        private ReplicationTarget replicationTarget;
119        protected boolean checksum;
120        protected boolean checkForCorruptionOnStartup;
121        protected boolean enableAsyncDiskSync = true;
122        private Timer timer;
123    
124        public synchronized void start() throws IOException {
125            if (started) {
126                return;
127            }
128    
129            long start = System.currentTimeMillis();
130            accessorPool = new DataFileAccessorPool(this);
131            started = true;
132            preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
133    
134            appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
135    
136            File[] files = directory.listFiles(new FilenameFilter() {
137                public boolean accept(File dir, String n) {
138                    return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
139                }
140            });
141    
142            if (files != null) {
143                for (File file : files) {
144                    try {
145                        String n = file.getName();
146                        String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
147                        int num = Integer.parseInt(numStr);
148                        DataFile dataFile = new DataFile(file, num, preferedFileLength);
149                        fileMap.put(dataFile.getDataFileId(), dataFile);
150                        totalLength.addAndGet(dataFile.getLength());
151                    } catch (NumberFormatException e) {
152                        // Ignore file that do not match the pattern.
153                    }
154                }
155    
156                // Sort the list so that we can link the DataFiles together in the
157                // right order.
158                List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
159                Collections.sort(l);
160                for (DataFile df : l) {
161                    if (df.getLength() == 0) {
162                        // possibly the result of a previous failed write
163                        LOG.info("ignoring zero length, partially initialised journal data file: " + df);
164                        continue;
165                    }
166                    dataFiles.addLast(df);
167                    fileByFileMap.put(df.getFile(), df);
168    
169                    if( isCheckForCorruptionOnStartup() ) {
170                        lastAppendLocation.set(recoveryCheck(df));
171                    }
172                }
173            }
174    
175            getCurrentWriteFile();
176    
177            if( lastAppendLocation.get()==null ) {
178                DataFile df = dataFiles.getTail();
179                lastAppendLocation.set(recoveryCheck(df));
180            }
181    
182            cleanupTask = new Runnable() {
183                public void run() {
184                    cleanup();
185                }
186            };
187            this.timer = new Timer("KahaDB Scheduler", true);
188            TimerTask task = new SchedulerTimerTask(cleanupTask);
189            this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL);
190            long end = System.currentTimeMillis();
191            LOG.trace("Startup took: "+(end-start)+" ms");
192        }
193    
194        private static byte[] bytes(String string) {
195            try {
196                return string.getBytes("UTF-8");
197            } catch (UnsupportedEncodingException e) {
198                throw new RuntimeException(e);
199            }
200        }
201    
202        protected Location recoveryCheck(DataFile dataFile) throws IOException {
203            Location location = new Location();
204            location.setDataFileId(dataFile.getDataFileId());
205            location.setOffset(0);
206    
207            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
208            try {
209                while( true ) {
210                    int size = checkBatchRecord(reader, location.getOffset());
211                    if ( size>=0 ) {
212                        location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
213                    } else {
214    
215                        // Perhaps it's just some corruption... scan through the file to find the next valid batch record.  We
216                        // may have subsequent valid batch records.
217                        int nextOffset = findNextBatchRecord(reader, location.getOffset()+1);
218                        if( nextOffset >=0 ) {
219                            Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
220                            LOG.info("Corrupt journal records found in '"+dataFile.getFile()+"' between offsets: "+sequence);
221                            dataFile.corruptedBlocks.add(sequence);
222                            location.setOffset(nextOffset);
223                        } else {
224                            break;
225                        }
226                    }
227                }
228    
229            } catch (IOException e) {
230            } finally {
231                accessorPool.closeDataFileAccessor(reader);
232            }
233    
234            int existingLen = dataFile.getLength();
235            dataFile.setLength(location.getOffset());
236            if (existingLen > dataFile.getLength()) {
237                totalLength.addAndGet(dataFile.getLength() - existingLen);
238            }
239    
240            if( !dataFile.corruptedBlocks.isEmpty() ) {
241                // Is the end of the data file corrupted?
242                if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) {
243                    dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
244                }
245            }
246    
247            return location;
248        }
249    
250        private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
251            ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
252            byte data[] = new byte[1024*4];
253            ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
254    
255            int pos = 0;
256            while( true ) {
257                pos = bs.indexOf(header, pos);
258                if( pos >= 0 ) {
259                    return offset+pos;
260                } else {
261                    // need to load the next data chunck in..
262                    if( bs.length != data.length ) {
263                        // If we had a short read then we were at EOF
264                        return -1;
265                    }
266                    offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length;
267                    bs = new ByteSequence(data, 0, reader.read(offset, data));
268                    pos=0;
269                }
270            }
271        }
272    
273    
274        public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
275            byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
276            DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
277    
278            reader.readFully(offset, controlRecord);
279    
280            // Assert that it's  a batch record.
281            for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
282                if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
283                    return -1;
284                }
285            }
286    
287            int size = controlIs.readInt();
288            if( size > MAX_BATCH_SIZE ) {
289                return -1;
290            }
291    
292            if( isChecksum() ) {
293    
294                long expectedChecksum = controlIs.readLong();
295                if( expectedChecksum == 0 ) {
296                    // Checksuming was not enabled when the record was stored.
297                    // we can't validate the record :(
298                    return size;
299                }
300    
301                byte data[] = new byte[size];
302                reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
303    
304                Checksum checksum = new Adler32();
305                checksum.update(data, 0, data.length);
306    
307                if( expectedChecksum!=checksum.getValue() ) {
308                    return -1;
309                }
310    
311            }
312            return size;
313        }
314    
315    
316        void addToTotalLength(int size) {
317            totalLength.addAndGet(size);
318        }
319    
320        public long length() {
321            return totalLength.get();
322        }
323    
324        synchronized DataFile getCurrentWriteFile() throws IOException {
325            if (dataFiles.isEmpty()) {
326                rotateWriteFile();
327            }
328            return dataFiles.getTail();
329        }
330    
331        synchronized DataFile rotateWriteFile() {
332            int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
333            File file = getFile(nextNum);
334            DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
335            // actually allocate the disk space
336            fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
337            fileByFileMap.put(file, nextWriteFile);
338            dataFiles.addLast(nextWriteFile);
339            return nextWriteFile;
340        }
341    
342        public File getFile(int nextNum) {
343            String fileName = filePrefix + nextNum + fileSuffix;
344            File file = new File(directory, fileName);
345            return file;
346        }
347    
348        synchronized DataFile getDataFile(Location item) throws IOException {
349            Integer key = Integer.valueOf(item.getDataFileId());
350            DataFile dataFile = fileMap.get(key);
351            if (dataFile == null) {
352                LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
353                throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
354            }
355            return dataFile;
356        }
357    
358        synchronized File getFile(Location item) throws IOException {
359            Integer key = Integer.valueOf(item.getDataFileId());
360            DataFile dataFile = fileMap.get(key);
361            if (dataFile == null) {
362                LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
363                throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
364            }
365            return dataFile.getFile();
366        }
367    
368        private DataFile getNextDataFile(DataFile dataFile) {
369            return dataFile.getNext();
370        }
371    
372        public synchronized void close() throws IOException {
373            if (!started) {
374                return;
375            }
376            if (this.timer != null) {
377                this.timer.cancel();
378            }
379            accessorPool.close();
380            appender.close();
381            fileMap.clear();
382            fileByFileMap.clear();
383            dataFiles.clear();
384            lastAppendLocation.set(null);
385            started = false;
386        }
387    
388        protected synchronized void cleanup() {
389            if (accessorPool != null) {
390                accessorPool.disposeUnused();
391            }
392        }
393    
394        public synchronized boolean delete() throws IOException {
395    
396            // Close all open file handles...
397            appender.close();
398            accessorPool.close();
399    
400            boolean result = true;
401            for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
402                DataFile dataFile = i.next();
403                totalLength.addAndGet(-dataFile.getLength());
404                result &= dataFile.delete();
405            }
406            fileMap.clear();
407            fileByFileMap.clear();
408            lastAppendLocation.set(null);
409            dataFiles = new LinkedNodeList<DataFile>();
410    
411            // reopen open file handles...
412            accessorPool = new DataFileAccessorPool(this);
413            appender = new DataFileAppender(this);
414            return result;
415        }
416    
417        public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
418            for (Integer key : files) {
419                // Can't remove the data file (or subsequent files) that is currently being written to.
420                if( key >= lastAppendLocation.get().getDataFileId() ) {
421                    continue;
422                }
423                DataFile dataFile = fileMap.get(key);
424                if( dataFile!=null ) {
425                    forceRemoveDataFile(dataFile);
426                }
427            }
428        }
429    
430        private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
431            accessorPool.disposeDataFileAccessors(dataFile);
432            fileByFileMap.remove(dataFile.getFile());
433            fileMap.remove(dataFile.getDataFileId());
434            totalLength.addAndGet(-dataFile.getLength());
435            dataFile.unlink();
436            if (archiveDataLogs) {
437                dataFile.move(getDirectoryArchive());
438                LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive());
439            } else {
440                if ( dataFile.delete() ) {
441                    LOG.debug("Discarded data file " + dataFile);
442                } else {
443                    LOG.warn("Failed to discard data file " + dataFile.getFile());
444                }
445            }
446        }
447    
448        /**
449         * @return the maxFileLength
450         */
451        public int getMaxFileLength() {
452            return maxFileLength;
453        }
454    
455        /**
456         * @param maxFileLength the maxFileLength to set
457         */
458        public void setMaxFileLength(int maxFileLength) {
459            this.maxFileLength = maxFileLength;
460        }
461    
462        @Override
463        public String toString() {
464            return directory.toString();
465        }
466    
467        public synchronized void appendedExternally(Location loc, int length) throws IOException {
468            DataFile dataFile = null;
469            if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
470                // It's an update to the current log file..
471                dataFile = dataFiles.getTail();
472                dataFile.incrementLength(length);
473            } else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
474                // It's an update to the next log file.
475                int nextNum = loc.getDataFileId();
476                File file = getFile(nextNum);
477                dataFile = new DataFile(file, nextNum, preferedFileLength);
478                // actually allocate the disk space
479                fileMap.put(dataFile.getDataFileId(), dataFile);
480                fileByFileMap.put(file, dataFile);
481                dataFiles.addLast(dataFile);
482            } else {
483                throw new IOException("Invalid external append.");
484            }
485        }
486    
487        public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
488    
489            Location cur = null;
490            while (true) {
491                if (cur == null) {
492                    if (location == null) {
493                        DataFile head = dataFiles.getHead();
494                        if( head == null ) {
495                            return null;
496                        }
497                        cur = new Location();
498                        cur.setDataFileId(head.getDataFileId());
499                        cur.setOffset(0);
500                    } else {
501                        // Set to the next offset..
502                        if (location.getSize() == -1) {
503                            cur = new Location(location);
504                        } else {
505                            cur = new Location(location);
506                            cur.setOffset(location.getOffset() + location.getSize());
507                        }
508                    }
509                } else {
510                    cur.setOffset(cur.getOffset() + cur.getSize());
511                }
512    
513                DataFile dataFile = getDataFile(cur);
514    
515                // Did it go into the next file??
516                if (dataFile.getLength() <= cur.getOffset()) {
517                    dataFile = getNextDataFile(dataFile);
518                    if (dataFile == null) {
519                        return null;
520                    } else {
521                        cur.setDataFileId(dataFile.getDataFileId().intValue());
522                        cur.setOffset(0);
523                    }
524                }
525    
526                // Load in location size and type.
527                DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
528                try {
529                    reader.readLocationDetails(cur);
530                } finally {
531                    accessorPool.closeDataFileAccessor(reader);
532                }
533    
534                if (cur.getType() == 0) {
535                    return null;
536                } else if (cur.getType() == USER_RECORD_TYPE) {
537                    // Only return user records.
538                    return cur;
539                }
540            }
541        }
542    
543        public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException {
544            DataFile df = fileByFileMap.get(file);
545            return getNextLocation(df, lastLocation, thisFileOnly);
546        }
547    
548        public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException {
549    
550            Location cur = null;
551            while (true) {
552                if (cur == null) {
553                    if (lastLocation == null) {
554                        DataFile head = dataFile.getHeadNode();
555                        cur = new Location();
556                        cur.setDataFileId(head.getDataFileId());
557                        cur.setOffset(0);
558                    } else {
559                        // Set to the next offset..
560                        cur = new Location(lastLocation);
561                        cur.setOffset(cur.getOffset() + cur.getSize());
562                    }
563                } else {
564                    cur.setOffset(cur.getOffset() + cur.getSize());
565                }
566    
567                // Did it go into the next file??
568                if (dataFile.getLength() <= cur.getOffset()) {
569                    if (thisFileOnly) {
570                        return null;
571                    } else {
572                        dataFile = getNextDataFile(dataFile);
573                        if (dataFile == null) {
574                            return null;
575                        } else {
576                            cur.setDataFileId(dataFile.getDataFileId().intValue());
577                            cur.setOffset(0);
578                        }
579                    }
580                }
581    
582                // Load in location size and type.
583                DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
584                try {
585                    reader.readLocationDetails(cur);
586                } finally {
587                    accessorPool.closeDataFileAccessor(reader);
588                }
589    
590                if (cur.getType() == 0) {
591                    return null;
592                } else if (cur.getType() > 0) {
593                    // Only return user records.
594                    return cur;
595                }
596            }
597        }
598    
599        public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
600            DataFile dataFile = getDataFile(location);
601            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
602            ByteSequence rc = null;
603            try {
604                rc = reader.readRecord(location);
605            } finally {
606                accessorPool.closeDataFileAccessor(reader);
607            }
608            return rc;
609        }
610    
611        public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
612            Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
613            return loc;
614        }
615    
616        public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
617            Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
618            return loc;
619        }
620    
621        public void update(Location location, ByteSequence data, boolean sync) throws IOException {
622            DataFile dataFile = getDataFile(location);
623            DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
624            try {
625                updater.updateRecord(location, data, sync);
626            } finally {
627                accessorPool.closeDataFileAccessor(updater);
628            }
629        }
630    
631        public File getDirectory() {
632            return directory;
633        }
634    
635        public void setDirectory(File directory) {
636            this.directory = directory;
637        }
638    
639        public String getFilePrefix() {
640            return filePrefix;
641        }
642    
643        public void setFilePrefix(String filePrefix) {
644            this.filePrefix = filePrefix;
645        }
646    
647        public Map<WriteKey, WriteCommand> getInflightWrites() {
648            return inflightWrites;
649        }
650    
651        public Location getLastAppendLocation() {
652            return lastAppendLocation.get();
653        }
654    
655        public void setLastAppendLocation(Location lastSyncedLocation) {
656            this.lastAppendLocation.set(lastSyncedLocation);
657        }
658    
659        public File getDirectoryArchive() {
660            return directoryArchive;
661        }
662    
663        public void setDirectoryArchive(File directoryArchive) {
664            this.directoryArchive = directoryArchive;
665        }
666    
667        public boolean isArchiveDataLogs() {
668            return archiveDataLogs;
669        }
670    
671        public void setArchiveDataLogs(boolean archiveDataLogs) {
672            this.archiveDataLogs = archiveDataLogs;
673        }
674    
675        synchronized public Integer getCurrentDataFileId() {
676            if (dataFiles.isEmpty())
677                return null;
678            return dataFiles.getTail().getDataFileId();
679        }
680    
681        /**
682         * Get a set of files - only valid after start()
683         *
684         * @return files currently being used
685         */
686        public Set<File> getFiles() {
687            return fileByFileMap.keySet();
688        }
689    
690        public synchronized Map<Integer, DataFile> getFileMap() {
691            return new TreeMap<Integer, DataFile>(fileMap);
692        }
693    
694        public long getDiskSize() {
695            long tailLength=0;
696            synchronized( this ) {
697                if( !dataFiles.isEmpty() ) {
698                    tailLength = dataFiles.getTail().getLength();
699                }
700            }
701    
702            long rc = totalLength.get();
703    
704            // The last file is actually at a minimum preferedFileLength big.
705            if( tailLength < preferedFileLength ) {
706                rc -= tailLength;
707                rc += preferedFileLength;
708            }
709            return rc;
710        }
711    
712        public void setReplicationTarget(ReplicationTarget replicationTarget) {
713            this.replicationTarget = replicationTarget;
714        }
715        public ReplicationTarget getReplicationTarget() {
716            return replicationTarget;
717        }
718    
719        public String getFileSuffix() {
720            return fileSuffix;
721        }
722    
723        public void setFileSuffix(String fileSuffix) {
724            this.fileSuffix = fileSuffix;
725        }
726    
727        public boolean isChecksum() {
728            return checksum;
729        }
730    
731        public void setChecksum(boolean checksumWrites) {
732            this.checksum = checksumWrites;
733        }
734    
735        public boolean isCheckForCorruptionOnStartup() {
736            return checkForCorruptionOnStartup;
737        }
738    
739        public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
740            this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
741        }
742    
743        public void setWriteBatchSize(int writeBatchSize) {
744            this.writeBatchSize = writeBatchSize;
745        }
746    
747        public int getWriteBatchSize() {
748            return writeBatchSize;
749        }
750    
751        public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
752           this.totalLength = storeSizeAccumulator;
753        }
754    
755        public void setEnableAsyncDiskSync(boolean val) {
756            this.enableAsyncDiskSync = val;
757        }
758    
759        public boolean isEnableAsyncDiskSync() {
760            return enableAsyncDiskSync;
761        }
762    
763        public static class WriteCommand extends LinkedNode<WriteCommand> {
764            public final Location location;
765            public final ByteSequence data;
766            final boolean sync;
767            public final Runnable onComplete;
768    
769            public WriteCommand(Location location, ByteSequence data, boolean sync) {
770                this.location = location;
771                this.data = data;
772                this.sync = sync;
773                this.onComplete = null;
774            }
775    
776            public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
777                this.location = location;
778                this.data = data;
779                this.onComplete = onComplete;
780                this.sync = false;
781            }
782        }
783    
784        public static class WriteKey {
785            private final int file;
786            private final long offset;
787            private final int hash;
788    
789            public WriteKey(Location item) {
790                file = item.getDataFileId();
791                offset = item.getOffset();
792                // TODO: see if we can build a better hash
793                hash = (int)(file ^ offset);
794            }
795    
796            public int hashCode() {
797                return hash;
798            }
799    
800            public boolean equals(Object obj) {
801                if (obj instanceof WriteKey) {
802                    WriteKey di = (WriteKey)obj;
803                    return di.file == file && di.offset == offset;
804                }
805                return false;
806            }
807        }
808    }