001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.kaha.impl.async;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.DataInputStream;
022    import java.io.DataOutputStream;
023    import java.io.File;
024    import java.io.FilenameFilter;
025    import java.io.IOException;
026    import java.util.ArrayList;
027    import java.util.Collections;
028    import java.util.HashMap;
029    import java.util.HashSet;
030    import java.util.Iterator;
031    import java.util.LinkedHashMap;
032    import java.util.List;
033    import java.util.Map;
034    import java.util.Set;
035    import java.util.concurrent.ConcurrentHashMap;
036    import java.util.concurrent.atomic.AtomicLong;
037    import java.util.concurrent.atomic.AtomicReference;
038    import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
039    import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
040    import org.apache.activemq.thread.Scheduler;
041    import org.apache.activemq.util.ByteSequence;
042    import org.apache.activemq.util.IOHelper;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    
047    
048    /**
049     * Manages DataFiles
050     * 
051     * 
052     */
053    public class AsyncDataManager {
054    
055        public static final int CONTROL_RECORD_MAX_LENGTH = 1024;
056        public static final int ITEM_HEAD_RESERVED_SPACE = 21;
057        // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
058        public static final int ITEM_HEAD_SPACE = 4 + 1 + ITEM_HEAD_RESERVED_SPACE + 3;
059        public static final int ITEM_HEAD_OFFSET_TO_SOR = ITEM_HEAD_SPACE - 3;
060        public static final int ITEM_FOOT_SPACE = 3; // EOR
061    
062        public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE + ITEM_FOOT_SPACE;
063    
064        public static final byte[] ITEM_HEAD_SOR = new byte[] {'S', 'O', 'R'}; // 
065        public static final byte[] ITEM_HEAD_EOR = new byte[] {'E', 'O', 'R'}; // 
066    
067        public static final byte DATA_ITEM_TYPE = 1;
068        public static final byte REDO_ITEM_TYPE = 2;
069        public static final String DEFAULT_DIRECTORY = "data";
070        public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
071        public static final String DEFAULT_FILE_PREFIX = "data-";
072        public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
073        public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
074        public static final int PREFERED_DIFF = 1024 * 512;
075    
076        private static final Logger LOG = LoggerFactory.getLogger(AsyncDataManager.class);
077        protected Scheduler scheduler;
078    
079        protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
080    
081        protected File directory = new File(DEFAULT_DIRECTORY);
082        protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
083        protected String filePrefix = DEFAULT_FILE_PREFIX;
084        protected ControlFile controlFile;
085        protected boolean started;
086        protected boolean useNio = true;
087    
088        protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
089        protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
090    
091        protected DataFileAppender appender;
092        protected DataFileAccessorPool accessorPool;
093    
094        protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
095        protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
096        protected DataFile currentWriteFile;
097    
098        protected Location mark;
099        protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
100        protected Runnable cleanupTask;
101        protected final AtomicLong storeSize;
102        protected boolean archiveDataLogs;
103        
104        public AsyncDataManager(AtomicLong storeSize) {
105            this.storeSize=storeSize;
106        }
107        
108        public AsyncDataManager() {
109            this(new AtomicLong());
110        }
111    
112        @SuppressWarnings("unchecked")
113        public synchronized void start() throws IOException {
114            if (started) {
115                return;
116            }
117    
118            started = true;
119            preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF);
120            lock();
121    
122            accessorPool = new DataFileAccessorPool(this);
123            ByteSequence sequence = controlFile.load();
124            if (sequence != null && sequence.getLength() > 0) {
125                unmarshallState(sequence);
126            }
127            if (useNio) {
128                appender = new NIODataFileAppender(this);
129            } else {
130                appender = new DataFileAppender(this);
131            }
132    
133            File[] files = directory.listFiles(new FilenameFilter() {
134                public boolean accept(File dir, String n) {
135                    return dir.equals(directory) && n.startsWith(filePrefix);
136                }
137            });
138           
139            if (files != null) {
140                for (int i = 0; i < files.length; i++) {
141                    try {
142                        File file = files[i];
143                        String n = file.getName();
144                        String numStr = n.substring(filePrefix.length(), n.length());
145                        int num = Integer.parseInt(numStr);
146                        DataFile dataFile = new DataFile(file, num, preferedFileLength);
147                        fileMap.put(dataFile.getDataFileId(), dataFile);
148                        storeSize.addAndGet(dataFile.getLength());
149                    } catch (NumberFormatException e) {
150                        // Ignore file that do not match the pattern.
151                    }
152                }
153    
154                // Sort the list so that we can link the DataFiles together in the
155                // right order.
156                List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
157                Collections.sort(l);
158                currentWriteFile = null;
159                for (DataFile df : l) {
160                    if (currentWriteFile != null) {
161                        currentWriteFile.linkAfter(df);
162                    }
163                    currentWriteFile = df;
164                    fileByFileMap.put(df.getFile(), df);
165                }
166            }
167    
168            // Need to check the current Write File to see if there was a partial
169            // write to it.
170            if (currentWriteFile != null) {
171    
172                // See if the lastSyncedLocation is valid..
173                Location l = lastAppendLocation.get();
174                if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
175                    l = null;
176                }
177    
178                // If we know the last location that was ok.. then we can skip lots
179                // of checking
180                try{
181                l = recoveryCheck(currentWriteFile, l);
182                lastAppendLocation.set(l);
183                }catch(IOException e){
184                    LOG.warn("recovery check failed", e);
185                }
186            }
187    
188            storeState(false);
189    
190            cleanupTask = new Runnable() {
191                public void run() {
192                    cleanup();
193                }
194            };
195            this.scheduler = new Scheduler("AsyncDataManager Scheduler");
196            try {
197                this.scheduler.start();
198            } catch (Exception e) {
199                IOException ioe =  new IOException("scheduler start: " + e);
200                ioe.initCause(e);
201                throw ioe;
202            }
203            this.scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
204        }
205    
206        public void lock() throws IOException {
207            synchronized (this) {
208                if (controlFile == null || controlFile.isDisposed()) {
209                    IOHelper.mkdirs(directory);
210                    controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH);
211                }
212                controlFile.lock();
213            }
214        }
215    
216        protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
217            if (location == null) {
218                location = new Location();
219                location.setDataFileId(dataFile.getDataFileId());
220                location.setOffset(0);
221            }
222            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
223            try {
224                reader.readLocationDetails(location);
225                while (reader.readLocationDetailsAndValidate(location)) {
226                    location.setOffset(location.getOffset() + location.getSize());
227                }
228            } finally {
229                accessorPool.closeDataFileAccessor(reader);
230            }
231            dataFile.setLength(location.getOffset());
232            return location;
233        }
234    
235        protected void unmarshallState(ByteSequence sequence) throws IOException {
236            ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength());
237            DataInputStream dis = new DataInputStream(bais);
238            if (dis.readBoolean()) {
239                mark = new Location();
240                mark.readExternal(dis);
241            } else {
242                mark = null;
243            }
244            if (dis.readBoolean()) {
245                Location l = new Location();
246                l.readExternal(dis);
247                lastAppendLocation.set(l);
248            } else {
249                lastAppendLocation.set(null);
250            }
251        }
252    
253        private synchronized ByteSequence marshallState() throws IOException {
254            ByteArrayOutputStream baos = new ByteArrayOutputStream();
255            DataOutputStream dos = new DataOutputStream(baos);
256    
257            if (mark != null) {
258                dos.writeBoolean(true);
259                mark.writeExternal(dos);
260            } else {
261                dos.writeBoolean(false);
262            }
263            Location l = lastAppendLocation.get();
264            if (l != null) {
265                dos.writeBoolean(true);
266                l.writeExternal(dos);
267            } else {
268                dos.writeBoolean(false);
269            }
270    
271            byte[] bs = baos.toByteArray();
272            return new ByteSequence(bs, 0, bs.length);
273        }
274    
275        synchronized DataFile allocateLocation(Location location) throws IOException {
276            if (currentWriteFile == null || ((currentWriteFile.getLength() + location.getSize()) > maxFileLength)) {
277                int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1;
278    
279                String fileName = filePrefix + nextNum;
280                File file = new File(directory, fileName);
281                DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
282                //actually allocate the disk space
283                nextWriteFile.closeRandomAccessFile(nextWriteFile.openRandomAccessFile(true));
284                fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
285                fileByFileMap.put(file, nextWriteFile);
286                if (currentWriteFile != null) {
287                    currentWriteFile.linkAfter(nextWriteFile);
288                    if (currentWriteFile.isUnused()) {
289                        removeDataFile(currentWriteFile);
290                    }
291                }
292                currentWriteFile = nextWriteFile;
293    
294            }
295            location.setOffset(currentWriteFile.getLength());
296            location.setDataFileId(currentWriteFile.getDataFileId().intValue());
297            int size = location.getSize();
298            currentWriteFile.incrementLength(size);
299            currentWriteFile.increment();
300            storeSize.addAndGet(size);
301            return currentWriteFile;
302        }
303        
304        public synchronized void removeLocation(Location location) throws IOException{
305           
306            DataFile dataFile = getDataFile(location);
307            dataFile.decrement();
308        }
309    
310        synchronized DataFile getDataFile(Location item) throws IOException {
311            Integer key = Integer.valueOf(item.getDataFileId());
312            DataFile dataFile = fileMap.get(key);
313            if (dataFile == null) {
314                LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
315                throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
316            }
317            return dataFile;
318        }
319        
320        synchronized File getFile(Location item) throws IOException {
321            Integer key = Integer.valueOf(item.getDataFileId());
322            DataFile dataFile = fileMap.get(key);
323            if (dataFile == null) {
324                LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
325                throw new IOException("Could not locate data file " + filePrefix  + item.getDataFileId());
326            }
327            return dataFile.getFile();
328        }
329    
330        private DataFile getNextDataFile(DataFile dataFile) {
331            return (DataFile)dataFile.getNext();
332        }
333    
334        public synchronized void close() throws IOException {
335            if (!started) {
336                return;
337            }
338            this.scheduler.cancel(cleanupTask);
339            try {
340                this.scheduler.stop();
341            } catch (Exception e) {
342                IOException ioe = new IOException("scheduler stop: " + e);
343                ioe.initCause(e);
344                throw ioe;
345            }
346            accessorPool.close();
347            storeState(false);
348            appender.close();
349            fileMap.clear();
350            fileByFileMap.clear();
351            controlFile.unlock();
352            controlFile.dispose();
353            started = false;
354        }
355    
356        synchronized void cleanup() {
357            if (accessorPool != null) {
358                accessorPool.disposeUnused();
359            }
360        }
361    
362        public synchronized boolean delete() throws IOException {
363    
364            // Close all open file handles...
365            appender.close();
366            accessorPool.close();
367    
368            boolean result = true;
369            for (Iterator i = fileMap.values().iterator(); i.hasNext();) {
370                DataFile dataFile = (DataFile)i.next();
371                storeSize.addAndGet(-dataFile.getLength());
372                result &= dataFile.delete();
373            }
374            fileMap.clear();
375            fileByFileMap.clear();
376            lastAppendLocation.set(null);
377            mark = null;
378            currentWriteFile = null;
379    
380            // reopen open file handles...
381            accessorPool = new DataFileAccessorPool(this);
382            if (useNio) {
383                appender = new NIODataFileAppender(this);
384            } else {
385                appender = new DataFileAppender(this);
386            }
387            return result;
388        }
389    
390        public synchronized void addInterestInFile(int file) throws IOException {
391            if (file >= 0) {
392                Integer key = Integer.valueOf(file);
393                DataFile dataFile = fileMap.get(key);
394                if (dataFile == null) {
395                    throw new IOException("That data file does not exist");
396                }
397                addInterestInFile(dataFile);
398            }
399        }
400    
401        synchronized void addInterestInFile(DataFile dataFile) {
402            if (dataFile != null) {
403                dataFile.increment();
404            }
405        }
406    
407        public synchronized void removeInterestInFile(int file) throws IOException {
408            if (file >= 0) {
409                Integer key = Integer.valueOf(file);
410                DataFile dataFile = fileMap.get(key);
411                removeInterestInFile(dataFile);
412            }
413           
414        }
415    
416        synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
417            if (dataFile != null) {
418                if (dataFile.decrement() <= 0) {
419                    removeDataFile(dataFile);
420                }
421            }
422        }
423    
424        public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer>inProgress) throws IOException {
425            Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
426            unUsed.removeAll(inUse);
427            unUsed.removeAll(inProgress);
428                    
429            List<DataFile> purgeList = new ArrayList<DataFile>();
430            for (Integer key : unUsed) {
431                DataFile dataFile = fileMap.get(key);
432                purgeList.add(dataFile);
433            }
434            for (DataFile dataFile : purgeList) {
435                if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) {
436                    forceRemoveDataFile(dataFile);
437                }
438            }
439        }
440    
441        public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException {
442            Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
443            unUsed.removeAll(inUse);
444                    
445            List<DataFile> purgeList = new ArrayList<DataFile>();
446            for (Integer key : unUsed) {
447                    // Only add files less than the lastFile..
448                    if( key.intValue() < lastFile.intValue() ) {
449                    DataFile dataFile = fileMap.get(key);
450                    purgeList.add(dataFile);
451                    }
452            }
453            if (LOG.isDebugEnabled()) {
454                LOG.debug("lastFileId=" + lastFile + ", purgeList: (" + purgeList.size() + ") " + purgeList);
455            }
456            for (DataFile dataFile : purgeList) {
457                forceRemoveDataFile(dataFile);
458            }
459            }
460    
461        public synchronized void consolidateDataFiles() throws IOException {
462            List<DataFile> purgeList = new ArrayList<DataFile>();
463            for (DataFile dataFile : fileMap.values()) {
464                if (dataFile.isUnused()) {
465                    purgeList.add(dataFile);
466                }
467            }
468            for (DataFile dataFile : purgeList) {
469                removeDataFile(dataFile);
470            }
471        }
472    
473        private synchronized void removeDataFile(DataFile dataFile) throws IOException {
474    
475            // Make sure we don't delete too much data.
476            if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) {
477                LOG.debug("Won't remove DataFile" + dataFile);
478                    return;
479            }
480            forceRemoveDataFile(dataFile);
481        }
482        
483        private synchronized void forceRemoveDataFile(DataFile dataFile)
484                throws IOException {
485            accessorPool.disposeDataFileAccessors(dataFile);
486            fileByFileMap.remove(dataFile.getFile());
487            fileMap.remove(dataFile.getDataFileId());
488            storeSize.addAndGet(-dataFile.getLength());
489            dataFile.unlink();
490            if (archiveDataLogs) {
491                dataFile.move(getDirectoryArchive());
492                LOG.debug("moved data file " + dataFile + " to "
493                        + getDirectoryArchive());
494            } else {
495                boolean result = dataFile.delete();
496                if (!result) {
497                    LOG.info("Failed to discard data file " + dataFile);
498                }
499            }
500        }
501    
502        /**
503         * @return the maxFileLength
504         */
505        public int getMaxFileLength() {
506            return maxFileLength;
507        }
508    
509        /**
510         * @param maxFileLength the maxFileLength to set
511         */
512        public void setMaxFileLength(int maxFileLength) {
513            this.maxFileLength = maxFileLength;
514        }
515    
516        @Override
517        public String toString() {
518            return "DataManager:(" + filePrefix + ")";
519        }
520    
521        public synchronized Location getMark() throws IllegalStateException {
522            return mark;
523        }
524    
525        public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
526    
527            Location cur = null;
528            while (true) {
529                if (cur == null) {
530                    if (location == null) {
531                        DataFile head = (DataFile)currentWriteFile.getHeadNode();
532                        cur = new Location();
533                        cur.setDataFileId(head.getDataFileId());
534                        cur.setOffset(0);
535                    } else {
536                        // Set to the next offset..
537                            if( location.getSize() == -1 ) {
538                                    cur = new Location(location);
539                            }  else {
540                                    cur = new Location(location);
541                                    cur.setOffset(location.getOffset()+location.getSize());
542                            }
543                    }
544                } else {
545                    cur.setOffset(cur.getOffset() + cur.getSize());
546                }
547    
548                DataFile dataFile = getDataFile(cur);
549    
550                // Did it go into the next file??
551                if (dataFile.getLength() <= cur.getOffset()) {
552                    dataFile = getNextDataFile(dataFile);
553                    if (dataFile == null) {
554                        return null;
555                    } else {
556                        cur.setDataFileId(dataFile.getDataFileId().intValue());
557                        cur.setOffset(0);
558                    }
559                }
560    
561                // Load in location size and type.
562                DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
563                try {
564                    reader.readLocationDetails(cur);
565                } finally {
566                    accessorPool.closeDataFileAccessor(reader);
567                }
568    
569                if (cur.getType() == 0) {
570                    return null;
571                } else if (cur.getType() > 0) {
572                    // Only return user records.
573                    return cur;
574                }
575            }
576        }
577        
578        public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{
579            DataFile df = fileByFileMap.get(file);
580            return getNextLocation(df, lastLocation,thisFileOnly);
581        }
582        
583        public synchronized Location getNextLocation(DataFile dataFile,
584                Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException {
585    
586            Location cur = null;
587            while (true) {
588                if (cur == null) {
589                    if (lastLocation == null) {
590                        DataFile head = (DataFile)dataFile.getHeadNode();
591                        cur = new Location();
592                        cur.setDataFileId(head.getDataFileId());
593                        cur.setOffset(0);
594                    } else {
595                        // Set to the next offset..
596                        cur = new Location(lastLocation);
597                        cur.setOffset(cur.getOffset() + cur.getSize());
598                    }
599                } else {
600                    cur.setOffset(cur.getOffset() + cur.getSize());
601                }
602    
603                
604                // Did it go into the next file??
605                if (dataFile.getLength() <= cur.getOffset()) {
606                    if (thisFileOnly) {
607                        return null;
608                    }else {
609                    dataFile = getNextDataFile(dataFile);
610                    if (dataFile == null) {
611                        return null;
612                    } else {
613                        cur.setDataFileId(dataFile.getDataFileId().intValue());
614                        cur.setOffset(0);
615                    }
616                    }
617                }
618    
619                // Load in location size and type.
620                DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
621                try {
622                    reader.readLocationDetails(cur);
623                } finally {
624                    accessorPool.closeDataFileAccessor(reader);
625                }
626    
627                if (cur.getType() == 0) {
628                    return null;
629                } else if (cur.getType() > 0) {
630                    // Only return user records.
631                    return cur;
632                }
633            }
634        }
635    
636        public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
637            DataFile dataFile = getDataFile(location);
638            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
639            ByteSequence rc = null;
640            try {
641                rc = reader.readRecord(location);
642            } finally {
643                accessorPool.closeDataFileAccessor(reader);
644            }
645            return rc;
646        }
647    
648        public void setMark(Location location, boolean sync) throws IOException, IllegalStateException {
649            synchronized (this) {
650                mark = location;
651            }
652            storeState(sync);
653        }
654    
655        protected synchronized void storeState(boolean sync) throws IOException {
656            ByteSequence state = marshallState();
657            appender.storeItem(state, Location.MARK_TYPE, sync);
658            controlFile.store(state, sync);
659        }
660    
661        public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
662            Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
663            return loc;
664        }
665        
666        public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
667            Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
668            return loc;
669        }
670    
671        public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
672            return appender.storeItem(data, type, sync);
673        }
674    
675        public void update(Location location, ByteSequence data, boolean sync) throws IOException {
676            DataFile dataFile = getDataFile(location);
677            DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
678            try {
679                updater.updateRecord(location, data, sync);
680            } finally {
681                accessorPool.closeDataFileAccessor(updater);
682            }
683        }
684    
685        public File getDirectory() {
686            return directory;
687        }
688    
689        public void setDirectory(File directory) {
690            this.directory = directory;
691        }
692    
693        public String getFilePrefix() {
694            return filePrefix;
695        }
696    
697        public void setFilePrefix(String filePrefix) {
698            this.filePrefix = IOHelper.toFileSystemSafeName(filePrefix);
699        }
700    
701        public Map<WriteKey, WriteCommand> getInflightWrites() {
702            return inflightWrites;
703        }
704    
705        public Location getLastAppendLocation() {
706            return lastAppendLocation.get();
707        }
708    
709        public void setLastAppendLocation(Location lastSyncedLocation) {
710            this.lastAppendLocation.set(lastSyncedLocation);
711        }
712    
713            public boolean isUseNio() {
714                    return useNio;
715            }
716    
717            public void setUseNio(boolean useNio) {
718                    this.useNio = useNio;
719            }
720            
721            public File getDirectoryArchive() {
722            return directoryArchive;
723        }
724    
725        public void setDirectoryArchive(File directoryArchive) {
726            this.directoryArchive = directoryArchive;
727        }
728        
729        public boolean isArchiveDataLogs() {
730            return archiveDataLogs;
731        }
732    
733        public void setArchiveDataLogs(boolean archiveDataLogs) {
734            this.archiveDataLogs = archiveDataLogs;
735        }
736    
737        synchronized public Integer getCurrentDataFileId() {
738            if( currentWriteFile==null )
739                return null;
740            return currentWriteFile.getDataFileId();
741        }
742        
743        /**
744         * Get a set of files - only valid after start()
745         * @return files currently being used
746         */
747        public Set<File> getFiles(){
748            return fileByFileMap.keySet();
749        }
750    
751            synchronized public long getDiskSize() {
752                    long rc=0;
753            DataFile cur = (DataFile)currentWriteFile.getHeadNode();
754            while( cur !=null ) {
755                    rc += cur.getLength();
756                    cur = (DataFile) cur.getNext();
757            }
758                    return rc;
759            }
760    
761            synchronized public long getDiskSizeUntil(Location startPosition) {
762                    long rc=0;
763            DataFile cur = (DataFile)currentWriteFile.getHeadNode();
764            while( cur !=null ) {
765                    if( cur.getDataFileId().intValue() >= startPosition.getDataFileId() ) {
766                            return rc + startPosition.getOffset();
767                    }
768                    rc += cur.getLength();
769                    cur = (DataFile) cur.getNext();
770            }
771                    return rc;
772            }
773    
774    }