001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.journal;
018
019import java.io.File;
020import java.io.FileNotFoundException;
021import java.io.FilenameFilter;
022import java.io.IOException;
023import java.io.RandomAccessFile;
024import java.io.UnsupportedEncodingException;
025import java.nio.ByteBuffer;
026import java.nio.channels.ClosedByInterruptException;
027import java.nio.channels.FileChannel;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.LinkedHashMap;
033import java.util.LinkedList;
034import java.util.Map;
035import java.util.Set;
036import java.util.TreeMap;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.Executors;
039import java.util.concurrent.Future;
040import java.util.concurrent.ScheduledExecutorService;
041import java.util.concurrent.ScheduledFuture;
042import java.util.concurrent.ThreadFactory;
043import java.util.concurrent.TimeUnit;
044import java.util.concurrent.atomic.AtomicLong;
045import java.util.concurrent.atomic.AtomicReference;
046import java.util.zip.Adler32;
047import java.util.zip.Checksum;
048
049import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
050import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
051import org.apache.activemq.store.kahadb.disk.util.Sequence;
052import org.apache.activemq.util.ByteSequence;
053import org.apache.activemq.util.DataByteArrayInputStream;
054import org.apache.activemq.util.DataByteArrayOutputStream;
055import org.apache.activemq.util.IOHelper;
056import org.apache.activemq.util.RecoverableRandomAccessFile;
057import org.apache.activemq.util.ThreadPoolUtils;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Manages DataFiles
063 */
064public class Journal {
065    public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
066    public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
067
068    private static final int PREALLOC_CHUNK_SIZE = 1024*1024;
069
070    // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
071    public static final int RECORD_HEAD_SPACE = 4 + 1;
072
073    public static final byte USER_RECORD_TYPE = 1;
074    public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
075    // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
076    public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
077    public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8;
078    public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
079    public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader();
080    public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt();
081    public static final byte EOF_EOT = '4';
082    public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord();
083
084    private ScheduledExecutorService scheduler;
085
086    // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss
087    public void corruptRecoveryLocation(Location recoveryPosition) throws IOException {
088        DataFile dataFile = getDataFile(recoveryPosition);
089        // with corruption on recovery we have no faith in the content - slip to the next batch record or eof
090        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
091        try {
092            int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1);
093            Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1);
094            LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
095
096            // skip corruption on getNextLocation
097            recoveryPosition.setOffset((int) sequence.getLast() + 1);
098            recoveryPosition.setSize(-1);
099
100            dataFile.corruptedBlocks.add(sequence);
101        } catch (IOException e) {
102        } finally {
103            accessorPool.closeDataFileAccessor(reader);
104        }
105    }
106
107    public DataFileAccessorPool getAccessorPool() {
108        return accessorPool;
109    }
110
111    public enum PreallocationStrategy {
112        SPARSE_FILE,
113        OS_KERNEL_COPY,
114        ZEROS,
115        CHUNKED_ZEROS;
116    }
117
118    public enum PreallocationScope {
119        ENTIRE_JOURNAL,
120        ENTIRE_JOURNAL_ASYNC,
121        NONE;
122    }
123
124    public enum JournalDiskSyncStrategy {
125        ALWAYS,
126        PERIODIC,
127        NEVER;
128    }
129
130    private static byte[] createBatchControlRecordHeader() {
131        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
132            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
133            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
134            os.write(BATCH_CONTROL_RECORD_MAGIC);
135            ByteSequence sequence = os.toByteSequence();
136            sequence.compact();
137            return sequence.getData();
138        } catch (IOException e) {
139            throw new RuntimeException("Could not create batch control record header.", e);
140        }
141    }
142
143    private static byte[] createEmptyBatchControlRecordHeader() {
144        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
145            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
146            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
147            os.write(BATCH_CONTROL_RECORD_MAGIC);
148            os.writeInt(0);
149            os.writeLong(0l);
150            ByteSequence sequence = os.toByteSequence();
151            sequence.compact();
152            return sequence.getData();
153        } catch (IOException e) {
154            throw new RuntimeException("Could not create empty batch control record header.", e);
155        }
156    }
157
158    private static byte[] createEofBatchAndLocationRecord() {
159        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
160            os.writeInt(EOF_INT);
161            os.writeByte(EOF_EOT);
162            ByteSequence sequence = os.toByteSequence();
163            sequence.compact();
164            return sequence.getData();
165        } catch (IOException e) {
166            throw new RuntimeException("Could not create eof header.", e);
167        }
168    }
169
170    public static final String DEFAULT_DIRECTORY = ".";
171    public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
172    public static final String DEFAULT_FILE_PREFIX = "db-";
173    public static final String DEFAULT_FILE_SUFFIX = ".log";
174    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
175    public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
176    public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
177
178    private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
179
180    protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
181
182    protected File directory = new File(DEFAULT_DIRECTORY);
183    protected File directoryArchive;
184    private boolean directoryArchiveOverridden = false;
185
186    protected String filePrefix = DEFAULT_FILE_PREFIX;
187    protected String fileSuffix = DEFAULT_FILE_SUFFIX;
188    protected boolean started;
189
190    protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
191    protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
192
193    protected FileAppender appender;
194    protected DataFileAccessorPool accessorPool;
195
196    protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
197    protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
198    protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
199
200    protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
201    protected ScheduledFuture cleanupTask;
202    protected AtomicLong totalLength = new AtomicLong();
203    protected boolean archiveDataLogs;
204    private ReplicationTarget replicationTarget;
205    protected boolean checksum;
206    protected boolean checkForCorruptionOnStartup;
207    protected boolean enableAsyncDiskSync = true;
208    private int nextDataFileId = 1;
209    private Object dataFileIdLock = new Object();
210    private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
211    private volatile DataFile nextDataFile;
212
213    protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
214    protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
215    private File osKernelCopyTemplateFile = null;
216    protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
217
218    public interface DataFileRemovedListener {
219        void fileRemoved(DataFile datafile);
220    }
221
222    private DataFileRemovedListener dataFileRemovedListener;
223
224    public synchronized void start() throws IOException {
225        if (started) {
226            return;
227        }
228
229        long start = System.currentTimeMillis();
230        accessorPool = new DataFileAccessorPool(this);
231        started = true;
232
233        appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
234
235        File[] files = directory.listFiles(new FilenameFilter() {
236            @Override
237            public boolean accept(File dir, String n) {
238                return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
239            }
240        });
241
242        if (files != null) {
243            for (File file : files) {
244                try {
245                    String n = file.getName();
246                    String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
247                    int num = Integer.parseInt(numStr);
248                    DataFile dataFile = new DataFile(file, num);
249                    fileMap.put(dataFile.getDataFileId(), dataFile);
250                    totalLength.addAndGet(dataFile.getLength());
251                } catch (NumberFormatException e) {
252                    // Ignore file that do not match the pattern.
253                }
254            }
255
256            // Sort the list so that we can link the DataFiles together in the
257            // right order.
258            LinkedList<DataFile> l = new LinkedList<>(fileMap.values());
259            Collections.sort(l);
260            for (DataFile df : l) {
261                if (df.getLength() == 0) {
262                    // possibly the result of a previous failed write
263                    LOG.info("ignoring zero length, partially initialised journal data file: " + df);
264                    continue;
265                } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) {
266                    continue;
267                }
268                dataFiles.addLast(df);
269                fileByFileMap.put(df.getFile(), df);
270
271                if( isCheckForCorruptionOnStartup() ) {
272                    lastAppendLocation.set(recoveryCheck(df));
273                }
274            }
275        }
276
277        if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) {
278            // create a template file that will be used to pre-allocate the journal files
279            if (osKernelCopyTemplateFile == null) {
280                osKernelCopyTemplateFile = createJournalTemplateFile();
281            }
282        }
283
284        scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
285            @Override
286            public Thread newThread(Runnable r) {
287                Thread schedulerThread = new Thread(r);
288                schedulerThread.setName("ActiveMQ Journal Scheduled executor");
289                schedulerThread.setDaemon(true);
290                return schedulerThread;
291            }
292        });
293
294        // init current write file
295        if (dataFiles.isEmpty()) {
296            nextDataFileId = 1;
297            rotateWriteFile();
298        } else {
299            currentDataFile.set(dataFiles.getTail());
300            nextDataFileId = currentDataFile.get().dataFileId + 1;
301        }
302
303        if( lastAppendLocation.get()==null ) {
304            DataFile df = dataFiles.getTail();
305            lastAppendLocation.set(recoveryCheck(df));
306        }
307
308        // ensure we don't report unused space of last journal file in size metric
309        if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) {
310            totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength);
311        }
312
313        cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() {
314            @Override
315            public void run() {
316                cleanup();
317            }
318        }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS);
319
320        long end = System.currentTimeMillis();
321        LOG.trace("Startup took: "+(end-start)+" ms");
322    }
323
324    public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
325
326        if (PreallocationScope.NONE != preallocationScope) {
327
328            if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
329                doPreallocationKernelCopy(file);
330            } else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
331                doPreallocationZeros(file);
332            } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) {
333                doPreallocationChunkedZeros(file);
334            } else {
335                doPreallocationSparseFile(file);
336            }
337        }
338    }
339
340    private void doPreallocationSparseFile(RecoverableRandomAccessFile file) {
341        final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD);
342        try {
343            FileChannel channel = file.getChannel();
344            channel.position(0);
345            channel.write(journalEof);
346            channel.position(maxFileLength - 5);
347            journalEof.rewind();
348            channel.write(journalEof);
349            channel.force(false);
350            channel.position(0);
351        } catch (ClosedByInterruptException ignored) {
352            LOG.trace("Could not preallocate journal file with sparse file", ignored);
353        } catch (IOException e) {
354            LOG.error("Could not preallocate journal file with sparse file", e);
355        }
356    }
357
358    private void doPreallocationZeros(RecoverableRandomAccessFile file) {
359        ByteBuffer buffer = ByteBuffer.allocate(maxFileLength);
360        buffer.put(EOF_RECORD);
361        buffer.rewind();
362        try {
363            FileChannel channel = file.getChannel();
364            channel.write(buffer);
365            channel.force(false);
366            channel.position(0);
367        } catch (ClosedByInterruptException ignored) {
368            LOG.trace("Could not preallocate journal file with zeros", ignored);
369        } catch (IOException e) {
370            LOG.error("Could not preallocate journal file with zeros", e);
371        }
372    }
373
374    private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) {
375        try {
376            RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw");
377            templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel());
378            templateRaf.close();
379        } catch (ClosedByInterruptException ignored) {
380            LOG.trace("Could not preallocate journal file with kernel copy", ignored);
381        } catch (FileNotFoundException e) {
382            LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
383        } catch (IOException e) {
384            LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
385        }
386    }
387
388    private File createJournalTemplateFile() {
389        String fileName = "db-log.template";
390        File rc = new File(directory, fileName);
391        try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) {
392            templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD));
393            templateRaf.setLength(maxFileLength);
394            templateRaf.getChannel().force(true);
395        } catch (FileNotFoundException e) {
396            LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
397        } catch (IOException e) {
398            LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
399        }
400        return rc;
401    }
402
403    private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) {
404
405        ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE);
406        buffer.put(EOF_RECORD);
407        buffer.rewind();
408
409        try {
410            FileChannel channel = file.getChannel();
411
412            int remLen = maxFileLength;
413            while (remLen > 0) {
414                if (remLen < buffer.remaining()) {
415                    buffer.limit(remLen);
416                }
417                int writeLen = channel.write(buffer);
418                remLen -= writeLen;
419                buffer.rewind();
420            }
421
422            channel.force(false);
423            channel.position(0);
424        } catch (ClosedByInterruptException ignored) {
425            LOG.trace("Could not preallocate journal file with zeros", ignored);
426        } catch (IOException e) {
427            LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e);
428        }
429    }
430
431    private static byte[] bytes(String string) {
432        try {
433            return string.getBytes("UTF-8");
434        } catch (UnsupportedEncodingException e) {
435            throw new RuntimeException(e);
436        }
437    }
438
439    public boolean isUnusedPreallocated(DataFile dataFile) throws IOException {
440        int firstBatchRecordSize = -1;
441        if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) {
442            Location location = new Location();
443            location.setDataFileId(dataFile.getDataFileId());
444            location.setOffset(0);
445
446            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
447            try {
448                firstBatchRecordSize = checkBatchRecord(reader, location.getOffset());
449            } catch (Exception ignored) {
450            } finally {
451                accessorPool.closeDataFileAccessor(reader);
452            }
453        }
454        return firstBatchRecordSize == 0;
455    }
456
457    protected Location recoveryCheck(DataFile dataFile) throws IOException {
458        Location location = new Location();
459        location.setDataFileId(dataFile.getDataFileId());
460        location.setOffset(0);
461
462        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
463        try {
464            while (true) {
465                int size = checkBatchRecord(reader, location.getOffset());
466                if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) {
467                    if (size == 0) {
468                        // eof batch record
469                        break;
470                    }
471                    location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size);
472                } else {
473
474                    // Perhaps it's just some corruption... scan through the
475                    // file to find the next valid batch record. We
476                    // may have subsequent valid batch records.
477                    int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1);
478                    if (nextOffset >= 0) {
479                        Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
480                        LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
481                        dataFile.corruptedBlocks.add(sequence);
482                        location.setOffset(nextOffset);
483                    } else {
484                        break;
485                    }
486                }
487            }
488
489        } catch (IOException e) {
490        } finally {
491            accessorPool.closeDataFileAccessor(reader);
492        }
493
494        int existingLen = dataFile.getLength();
495        dataFile.setLength(location.getOffset());
496        if (existingLen > dataFile.getLength()) {
497            totalLength.addAndGet(dataFile.getLength() - existingLen);
498        }
499
500        if (!dataFile.corruptedBlocks.isEmpty()) {
501            // Is the end of the data file corrupted?
502            if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) {
503                dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
504            }
505        }
506
507        return location;
508    }
509
510    private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
511        ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
512        byte data[] = new byte[1024*4];
513        ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
514
515        int pos = 0;
516        while (true) {
517            pos = bs.indexOf(header, pos);
518            if (pos >= 0) {
519                return offset + pos;
520            } else {
521                // need to load the next data chunck in..
522                if (bs.length != data.length) {
523                    // If we had a short read then we were at EOF
524                    return -1;
525                }
526                offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length;
527                bs = new ByteSequence(data, 0, reader.read(offset, data));
528                pos = 0;
529            }
530        }
531    }
532
533    public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
534        byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
535
536        try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);) {
537
538            reader.readFully(offset, controlRecord);
539
540            // check for journal eof
541            if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) {
542                // eof batch
543                return 0;
544            }
545
546            // Assert that it's a batch record.
547            for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
548                if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) {
549                    return -1;
550                }
551            }
552
553            int size = controlIs.readInt();
554            if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) {
555                return -1;
556            }
557
558            if (isChecksum()) {
559
560                long expectedChecksum = controlIs.readLong();
561                if (expectedChecksum == 0) {
562                    // Checksuming was not enabled when the record was stored.
563                    // we can't validate the record :(
564                    return size;
565                }
566
567                byte data[] = new byte[size];
568                reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data);
569
570                Checksum checksum = new Adler32();
571                checksum.update(data, 0, data.length);
572
573                if (expectedChecksum != checksum.getValue()) {
574                    return -1;
575                }
576            }
577            return size;
578        }
579    }
580
581    void addToTotalLength(int size) {
582        totalLength.addAndGet(size);
583    }
584
585    public long length() {
586        return totalLength.get();
587    }
588
589    private void rotateWriteFile() throws IOException {
590       synchronized (dataFileIdLock) {
591            DataFile dataFile = nextDataFile;
592            if (dataFile == null) {
593                dataFile = newDataFile();
594            }
595            synchronized (currentDataFile) {
596                fileMap.put(dataFile.getDataFileId(), dataFile);
597                fileByFileMap.put(dataFile.getFile(), dataFile);
598                dataFiles.addLast(dataFile);
599                currentDataFile.set(dataFile);
600            }
601            nextDataFile = null;
602        }
603        if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) {
604            preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask);
605        }
606    }
607
608    private Runnable preAllocateNextDataFileTask = new Runnable() {
609        @Override
610        public void run() {
611            if (nextDataFile == null) {
612                synchronized (dataFileIdLock){
613                    try {
614                        nextDataFile = newDataFile();
615                    } catch (IOException e) {
616                        LOG.warn("Failed to proactively allocate data file", e);
617                    }
618                }
619            }
620        }
621    };
622
623    private volatile Future preAllocateNextDataFileFuture;
624
625    private DataFile newDataFile() throws IOException {
626        int nextNum = nextDataFileId++;
627        File file = getFile(nextNum);
628        DataFile nextWriteFile = new DataFile(file, nextNum);
629        preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile());
630        return nextWriteFile;
631    }
632
633
634    public DataFile reserveDataFile() {
635        synchronized (dataFileIdLock) {
636            int nextNum = nextDataFileId++;
637            File file = getFile(nextNum);
638            DataFile reservedDataFile = new DataFile(file, nextNum);
639            synchronized (currentDataFile) {
640                fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
641                fileByFileMap.put(file, reservedDataFile);
642                if (dataFiles.isEmpty()) {
643                    dataFiles.addLast(reservedDataFile);
644                } else {
645                    dataFiles.getTail().linkBefore(reservedDataFile);
646                }
647            }
648            return reservedDataFile;
649        }
650    }
651
652    public File getFile(int nextNum) {
653        String fileName = filePrefix + nextNum + fileSuffix;
654        File file = new File(directory, fileName);
655        return file;
656    }
657
658    DataFile getDataFile(Location item) throws IOException {
659        Integer key = Integer.valueOf(item.getDataFileId());
660        DataFile dataFile = null;
661        synchronized (currentDataFile) {
662            dataFile = fileMap.get(key);
663        }
664        if (dataFile == null) {
665            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
666            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
667        }
668        return dataFile;
669    }
670
671    public void close() throws IOException {
672        synchronized (this) {
673            if (!started) {
674                return;
675            }
676            cleanupTask.cancel(true);
677            if (preAllocateNextDataFileFuture != null) {
678                preAllocateNextDataFileFuture.cancel(true);
679            }
680            ThreadPoolUtils.shutdownGraceful(scheduler, 4000);
681            accessorPool.close();
682        }
683        // the appender can be calling back to to the journal blocking a close AMQ-5620
684        appender.close();
685        synchronized (currentDataFile) {
686            fileMap.clear();
687            fileByFileMap.clear();
688            dataFiles.clear();
689            lastAppendLocation.set(null);
690            started = false;
691        }
692    }
693
694    public synchronized void cleanup() {
695        if (accessorPool != null) {
696            accessorPool.disposeUnused();
697        }
698    }
699
700    public synchronized boolean delete() throws IOException {
701
702        // Close all open file handles...
703        appender.close();
704        accessorPool.close();
705
706        boolean result = true;
707        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
708            DataFile dataFile = i.next();
709            result &= dataFile.delete();
710        }
711
712        if (preAllocateNextDataFileFuture != null) {
713            preAllocateNextDataFileFuture.cancel(true);
714        }
715        synchronized (dataFileIdLock) {
716            if (nextDataFile != null) {
717                nextDataFile.delete();
718                nextDataFile = null;
719            }
720        }
721
722        totalLength.set(0);
723        synchronized (currentDataFile) {
724            fileMap.clear();
725            fileByFileMap.clear();
726            lastAppendLocation.set(null);
727            dataFiles = new LinkedNodeList<DataFile>();
728        }
729        // reopen open file handles...
730        accessorPool = new DataFileAccessorPool(this);
731        appender = new DataFileAppender(this);
732        return result;
733    }
734
735    public void removeDataFiles(Set<Integer> files) throws IOException {
736        for (Integer key : files) {
737            // Can't remove the data file (or subsequent files) that is currently being written to.
738            if (key >= lastAppendLocation.get().getDataFileId()) {
739                continue;
740            }
741            DataFile dataFile = null;
742            synchronized (currentDataFile) {
743                dataFile = fileMap.remove(key);
744                if (dataFile != null) {
745                    fileByFileMap.remove(dataFile.getFile());
746                    dataFile.unlink();
747                }
748            }
749            if (dataFile != null) {
750                forceRemoveDataFile(dataFile);
751            }
752        }
753    }
754
755    private void forceRemoveDataFile(DataFile dataFile) throws IOException {
756        accessorPool.disposeDataFileAccessors(dataFile);
757        totalLength.addAndGet(-dataFile.getLength());
758        if (archiveDataLogs) {
759            File directoryArchive = getDirectoryArchive();
760            if (directoryArchive.exists()) {
761                LOG.debug("Archive directory exists: {}", directoryArchive);
762            } else {
763                if (directoryArchive.isAbsolute())
764                if (LOG.isDebugEnabled()) {
765                    LOG.debug("Archive directory [{}] does not exist - creating it now",
766                            directoryArchive.getAbsolutePath());
767                }
768                IOHelper.mkdirs(directoryArchive);
769            }
770            LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath());
771            dataFile.move(directoryArchive);
772            LOG.debug("Successfully moved data file");
773        } else {
774            LOG.debug("Deleting data file: {}", dataFile);
775            if (dataFile.delete()) {
776                LOG.debug("Discarded data file: {}", dataFile);
777            } else {
778                LOG.warn("Failed to discard data file : {}", dataFile.getFile());
779            }
780        }
781        if (dataFileRemovedListener != null) {
782            dataFileRemovedListener.fileRemoved(dataFile);
783        }
784    }
785
786    /**
787     * @return the maxFileLength
788     */
789    public int getMaxFileLength() {
790        return maxFileLength;
791    }
792
793    /**
794     * @param maxFileLength the maxFileLength to set
795     */
796    public void setMaxFileLength(int maxFileLength) {
797        this.maxFileLength = maxFileLength;
798    }
799
800    @Override
801    public String toString() {
802        return directory.toString();
803    }
804
805    public Location getNextLocation(Location location) throws IOException, IllegalStateException {
806        Location cur = null;
807        while (true) {
808            if (cur == null) {
809                if (location == null) {
810                    DataFile head = null;
811                    synchronized (currentDataFile) {
812                        head = dataFiles.getHead();
813                    }
814                    if (head == null) {
815                        return null;
816                    }
817                    cur = new Location();
818                    cur.setDataFileId(head.getDataFileId());
819                    cur.setOffset(0);
820                } else {
821                    // Set to the next offset..
822                    if (location.getSize() == -1) {
823                        cur = new Location(location);
824                    } else {
825                        cur = new Location(location);
826                        cur.setOffset(location.getOffset() + location.getSize());
827                    }
828                }
829            } else {
830                cur.setOffset(cur.getOffset() + cur.getSize());
831            }
832
833            DataFile dataFile = getDataFile(cur);
834
835            // Did it go into the next file??
836            if (dataFile.getLength() <= cur.getOffset()) {
837                synchronized (currentDataFile) {
838                    dataFile = dataFile.getNext();
839                }
840                if (dataFile == null) {
841                    return null;
842                } else {
843                    cur.setDataFileId(dataFile.getDataFileId().intValue());
844                    cur.setOffset(0);
845                }
846            }
847
848            // Load in location size and type.
849            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
850            try {
851                reader.readLocationDetails(cur);
852            } finally {
853                accessorPool.closeDataFileAccessor(reader);
854            }
855
856            Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset());
857            if (corruptedRange != null) {
858                // skip corruption
859                cur.setSize((int) corruptedRange.range());
860            } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT ||
861                    (cur.getType() == 0 && cur.getSize() == 0)) {
862                // eof - jump to next datafile
863                // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for
864                // replay of existing journals
865                // possibly journal is larger than maxFileLength after config change
866                cur.setSize(EOF_RECORD.length);
867                cur.setOffset(Math.max(maxFileLength, dataFile.getLength()));
868            } else if (cur.getType() == USER_RECORD_TYPE) {
869                // Only return user records.
870                return cur;
871            }
872        }
873    }
874
875    public ByteSequence read(Location location) throws IOException, IllegalStateException {
876        DataFile dataFile = getDataFile(location);
877        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
878        ByteSequence rc = null;
879        try {
880            rc = reader.readRecord(location);
881        } finally {
882            accessorPool.closeDataFileAccessor(reader);
883        }
884        return rc;
885    }
886
887    public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
888        Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
889        return loc;
890    }
891
892    public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
893        Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
894        return loc;
895    }
896
897    public void update(Location location, ByteSequence data, boolean sync) throws IOException {
898        DataFile dataFile = getDataFile(location);
899        DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
900        try {
901            updater.updateRecord(location, data, sync);
902        } finally {
903            accessorPool.closeDataFileAccessor(updater);
904        }
905    }
906
907    public PreallocationStrategy getPreallocationStrategy() {
908        return preallocationStrategy;
909    }
910
911    public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) {
912        this.preallocationStrategy = preallocationStrategy;
913    }
914
915    public PreallocationScope getPreallocationScope() {
916        return preallocationScope;
917    }
918
919    public void setPreallocationScope(PreallocationScope preallocationScope) {
920        this.preallocationScope = preallocationScope;
921    }
922
923    public File getDirectory() {
924        return directory;
925    }
926
927    public void setDirectory(File directory) {
928        this.directory = directory;
929    }
930
931    public String getFilePrefix() {
932        return filePrefix;
933    }
934
935    public void setFilePrefix(String filePrefix) {
936        this.filePrefix = filePrefix;
937    }
938
939    public Map<WriteKey, WriteCommand> getInflightWrites() {
940        return inflightWrites;
941    }
942
943    public Location getLastAppendLocation() {
944        return lastAppendLocation.get();
945    }
946
947    public void setLastAppendLocation(Location lastSyncedLocation) {
948        this.lastAppendLocation.set(lastSyncedLocation);
949    }
950
951    public File getDirectoryArchive() {
952        if (!directoryArchiveOverridden && (directoryArchive == null)) {
953            // create the directoryArchive relative to the journal location
954            directoryArchive = new File(directory.getAbsolutePath() +
955                    File.separator + DEFAULT_ARCHIVE_DIRECTORY);
956        }
957        return directoryArchive;
958    }
959
960    public void setDirectoryArchive(File directoryArchive) {
961        directoryArchiveOverridden = true;
962        this.directoryArchive = directoryArchive;
963    }
964
965    public boolean isArchiveDataLogs() {
966        return archiveDataLogs;
967    }
968
969    public void setArchiveDataLogs(boolean archiveDataLogs) {
970        this.archiveDataLogs = archiveDataLogs;
971    }
972
973    public DataFile getDataFileById(int dataFileId) {
974        synchronized (currentDataFile) {
975            return fileMap.get(Integer.valueOf(dataFileId));
976        }
977    }
978
979    public DataFile getCurrentDataFile(int capacity) throws IOException {
980        synchronized (currentDataFile) {
981            if (currentDataFile.get().getLength() + capacity >= maxFileLength) {
982                rotateWriteFile();
983            }
984            return currentDataFile.get();
985        }
986    }
987
988    public Integer getCurrentDataFileId() {
989        synchronized (currentDataFile) {
990            return currentDataFile.get().getDataFileId();
991        }
992    }
993
994    /**
995     * Get a set of files - only valid after start()
996     *
997     * @return files currently being used
998     */
999    public Set<File> getFiles() {
1000        synchronized (currentDataFile) {
1001            return fileByFileMap.keySet();
1002        }
1003    }
1004
1005    public Map<Integer, DataFile> getFileMap() {
1006        synchronized (currentDataFile) {
1007            return new TreeMap<Integer, DataFile>(fileMap);
1008        }
1009    }
1010
1011    public long getDiskSize() {
1012        return totalLength.get();
1013    }
1014
1015    public void setReplicationTarget(ReplicationTarget replicationTarget) {
1016        this.replicationTarget = replicationTarget;
1017    }
1018
1019    public ReplicationTarget getReplicationTarget() {
1020        return replicationTarget;
1021    }
1022
1023    public String getFileSuffix() {
1024        return fileSuffix;
1025    }
1026
1027    public void setFileSuffix(String fileSuffix) {
1028        this.fileSuffix = fileSuffix;
1029    }
1030
1031    public boolean isChecksum() {
1032        return checksum;
1033    }
1034
1035    public void setChecksum(boolean checksumWrites) {
1036        this.checksum = checksumWrites;
1037    }
1038
1039    public boolean isCheckForCorruptionOnStartup() {
1040        return checkForCorruptionOnStartup;
1041    }
1042
1043    public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
1044        this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
1045    }
1046
1047    public void setWriteBatchSize(int writeBatchSize) {
1048        this.writeBatchSize = writeBatchSize;
1049    }
1050
1051    public int getWriteBatchSize() {
1052        return writeBatchSize;
1053    }
1054
1055    public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
1056       this.totalLength = storeSizeAccumulator;
1057    }
1058
1059    public void setEnableAsyncDiskSync(boolean val) {
1060        this.enableAsyncDiskSync = val;
1061    }
1062
1063    public boolean isEnableAsyncDiskSync() {
1064        return enableAsyncDiskSync;
1065    }
1066
1067    public JournalDiskSyncStrategy getJournalDiskSyncStrategy() {
1068        return journalDiskSyncStrategy;
1069    }
1070
1071    public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) {
1072        this.journalDiskSyncStrategy = journalDiskSyncStrategy;
1073    }
1074
1075    public boolean isJournalDiskSyncPeriodic() {
1076        return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy);
1077    }
1078
1079    public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) {
1080        this.dataFileRemovedListener = dataFileRemovedListener;
1081    }
1082
1083    public static class WriteCommand extends LinkedNode<WriteCommand> {
1084        public final Location location;
1085        public final ByteSequence data;
1086        final boolean sync;
1087        public final Runnable onComplete;
1088
1089        public WriteCommand(Location location, ByteSequence data, boolean sync) {
1090            this.location = location;
1091            this.data = data;
1092            this.sync = sync;
1093            this.onComplete = null;
1094        }
1095
1096        public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
1097            this.location = location;
1098            this.data = data;
1099            this.onComplete = onComplete;
1100            this.sync = false;
1101        }
1102    }
1103
1104    public static class WriteKey {
1105        private final int file;
1106        private final long offset;
1107        private final int hash;
1108
1109        public WriteKey(Location item) {
1110            file = item.getDataFileId();
1111            offset = item.getOffset();
1112            // TODO: see if we can build a better hash
1113            hash = (int)(file ^ offset);
1114        }
1115
1116        @Override
1117        public int hashCode() {
1118            return hash;
1119        }
1120
1121        @Override
1122        public boolean equals(Object obj) {
1123            if (obj instanceof WriteKey) {
1124                WriteKey di = (WriteKey)obj;
1125                return di.file == file && di.offset == offset;
1126            }
1127            return false;
1128        }
1129    }
1130}