001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.File;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.OutputStream;
025import java.util.ArrayList;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.LinkedHashMap;
030import java.util.TreeMap;
031import java.util.Map.Entry;
032import java.util.concurrent.atomic.AtomicBoolean;
033
034import org.apache.activemq.command.SubscriptionInfo;
035import org.apache.activemq.command.TransactionId;
036import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
037import org.apache.activemq.store.kahadb.data.KahaDestination;
038import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
039import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
040import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
041import org.apache.activemq.util.ByteSequence;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
045import org.apache.activemq.store.kahadb.disk.page.PageFile;
046import org.apache.activemq.store.kahadb.disk.page.Transaction;
047import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
048import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
049import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
050
051public class TempMessageDatabase {
052
053    private static final Logger LOG = LoggerFactory.getLogger(TempMessageDatabase.class);
054
055    public static final int CLOSED_STATE = 1;
056    public static final int OPEN_STATE = 2;
057
058    protected BTreeIndex<String, StoredDestination> destinations;
059    protected PageFile pageFile;
060
061    protected File directory;
062    
063    boolean enableIndexWriteAsync = true;
064    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 
065    
066    protected AtomicBoolean started = new AtomicBoolean();
067    protected AtomicBoolean opened = new AtomicBoolean();
068
069    public TempMessageDatabase() {
070    }
071
072    public void start() throws Exception {
073        if (started.compareAndSet(false, true)) {
074                load();
075        }
076    }
077
078    public void stop() throws Exception {
079        if (started.compareAndSet(true, false)) {
080            unload();
081        }
082    }
083
084        private void loadPageFile() throws IOException {
085                synchronized (indexMutex) {
086                    final PageFile pageFile = getPageFile();
087            pageFile.load();
088            pageFile.tx().execute(new Transaction.Closure<IOException>() {
089                public void execute(Transaction tx) throws IOException {
090                    destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
091                    destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
092                    destinations.setValueMarshaller(new StoredDestinationMarshaller());
093                    destinations.load(tx);
094                }
095            });
096            pageFile.flush();
097            storedDestinations.clear();
098        }
099        }
100        
101        /**
102         * @throws IOException
103         */
104        public void open() throws IOException {
105                if( opened.compareAndSet(false, true) ) {
106                loadPageFile();
107                }
108        }
109        
110    public void load() throws IOException {
111        synchronized (indexMutex) {
112                open();
113            pageFile.unload();
114            pageFile.delete();
115            loadPageFile();
116        }
117    }
118
119    
120        public void close() throws IOException, InterruptedException {
121                if( opened.compareAndSet(true, false)) {
122                synchronized (indexMutex) {
123                    pageFile.unload();
124                }
125                }
126        }
127        
128    public void unload() throws IOException, InterruptedException {
129        synchronized (indexMutex) {
130            if( pageFile.isLoaded() ) {
131                close();
132            }
133        }
134    }
135
136    public void processAdd(final KahaAddMessageCommand command, TransactionId txid, final ByteSequence data) throws IOException {
137        if (txid!=null) {
138            synchronized (indexMutex) {
139                ArrayList<Operation> inflightTx = getInflightTx(txid);
140                inflightTx.add(new AddOpperation(command, data));
141            }
142        } else {
143            synchronized (indexMutex) {
144                pageFile.tx().execute(new Transaction.Closure<IOException>() {
145                    public void execute(Transaction tx) throws IOException {
146                        upadateIndex(tx, command, data);
147                    }
148                });
149            }
150        }
151    }
152
153    public void processRemove(final KahaRemoveMessageCommand command, TransactionId txid) throws IOException {
154        if (txid!=null) {
155            synchronized (indexMutex) {
156                ArrayList<Operation> inflightTx = getInflightTx(txid);
157                inflightTx.add(new RemoveOpperation(command));
158            }
159        } else {
160            synchronized (indexMutex) {
161                pageFile.tx().execute(new Transaction.Closure<IOException>() {
162                    public void execute(Transaction tx) throws IOException {
163                        updateIndex(tx, command);
164                    }
165                });
166            }
167        }
168
169    }
170
171    public void process(final KahaRemoveDestinationCommand command) throws IOException {
172        synchronized (indexMutex) {
173            pageFile.tx().execute(new Transaction.Closure<IOException>() {
174                public void execute(Transaction tx) throws IOException {
175                    updateIndex(tx, command);
176                }
177            });
178        }
179    }
180
181    public void process(final KahaSubscriptionCommand command) throws IOException {
182        synchronized (indexMutex) {
183            pageFile.tx().execute(new Transaction.Closure<IOException>() {
184                public void execute(Transaction tx) throws IOException {
185                    updateIndex(tx, command);
186                }
187            });
188        }
189    }
190
191    public void processCommit(TransactionId key) throws IOException {
192        synchronized (indexMutex) {
193            ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
194            if (inflightTx == null) {
195                inflightTx = preparedTransactions.remove(key);
196            }
197            if (inflightTx == null) {
198                return;
199            }
200
201            final ArrayList<Operation> messagingTx = inflightTx;
202            pageFile.tx().execute(new Transaction.Closure<IOException>() {
203                public void execute(Transaction tx) throws IOException {
204                    for (Operation op : messagingTx) {
205                        op.execute(tx);
206                    }
207                }
208            });
209        }
210    }
211
212    public void processPrepare(TransactionId key) {
213        synchronized (indexMutex) {
214            ArrayList<Operation> tx = inflightTransactions.remove(key);
215            if (tx != null) {
216                preparedTransactions.put(key, tx);
217            }
218        }
219    }
220
221    public void processRollback(TransactionId key) {
222        synchronized (indexMutex) {
223            ArrayList<Operation> tx = inflightTransactions.remove(key);
224            if (tx == null) {
225                preparedTransactions.remove(key);
226            }
227        }
228    }
229
230    // /////////////////////////////////////////////////////////////////
231    // These methods do the actual index updates.
232    // /////////////////////////////////////////////////////////////////
233
234    protected final Object indexMutex = new Object();
235        private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
236
237    private void upadateIndex(Transaction tx, KahaAddMessageCommand command, ByteSequence data) throws IOException {
238        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
239
240        // Skip adding the message to the index if this is a topic and there are
241        // no subscriptions.
242        if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
243            return;
244        }
245
246        // Add the message.
247        long id = sd.nextMessageId++;
248        Long previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
249        if( previous == null ) {
250            sd.orderIndex.put(tx, id, new MessageRecord(command.getMessageId(), data));
251        } else {
252            // restore the previous value.. Looks like this was a redo of a previously
253            // added message.  We don't want to assing it a new id as the other indexes would 
254            // be wrong..
255            sd.messageIdIndex.put(tx, command.getMessageId(), previous);
256        }
257    }
258
259    private void updateIndex(Transaction tx, KahaRemoveMessageCommand command) throws IOException {
260        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
261        if (!command.hasSubscriptionKey()) {
262            
263            // In the queue case we just remove the message from the index..
264            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
265            if (sequenceId != null) {
266                sd.orderIndex.remove(tx, sequenceId);
267            }
268        } else {
269            // In the topic case we need remove the message once it's been acked
270            // by all the subs
271            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
272
273            // Make sure it's a valid message id...
274            if (sequence != null) {
275                String subscriptionKey = command.getSubscriptionKey();
276                Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
277
278                // The following method handles deleting un-referenced messages.
279                removeAckByteSequence(tx, sd, subscriptionKey, prev);
280
281                // Add it to the new location set.
282                addAckByteSequence(sd, sequence, subscriptionKey);
283            }
284
285        }
286    }
287
288    private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command) throws IOException {
289        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
290        sd.orderIndex.clear(tx);
291        sd.orderIndex.unload(tx);
292        tx.free(sd.orderIndex.getPageId());
293        
294        sd.messageIdIndex.clear(tx);
295        sd.messageIdIndex.unload(tx);
296        tx.free(sd.messageIdIndex.getPageId());
297
298        if (sd.subscriptions != null) {
299            sd.subscriptions.clear(tx);
300            sd.subscriptions.unload(tx);
301            tx.free(sd.subscriptions.getPageId());
302
303            sd.subscriptionAcks.clear(tx);
304            sd.subscriptionAcks.unload(tx);
305            tx.free(sd.subscriptionAcks.getPageId());
306        }
307
308        String key = key(command.getDestination());
309        storedDestinations.remove(key);
310        destinations.remove(tx, key);
311    }
312
313    private void updateIndex(Transaction tx, KahaSubscriptionCommand command) throws IOException {
314        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
315
316        // If set then we are creating it.. otherwise we are destroying the sub
317        if (command.hasSubscriptionInfo()) {
318            String subscriptionKey = command.getSubscriptionKey();
319            sd.subscriptions.put(tx, subscriptionKey, command);
320            long ackByteSequence=-1;
321            if (!command.getRetroactive()) {
322                ackByteSequence = sd.nextMessageId-1;
323            }
324
325            sd.subscriptionAcks.put(tx, subscriptionKey, ackByteSequence);
326            addAckByteSequence(sd, ackByteSequence, subscriptionKey);
327        } else {
328            // delete the sub...
329            String subscriptionKey = command.getSubscriptionKey();
330            sd.subscriptions.remove(tx, subscriptionKey);
331            Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
332            if( prev!=null ) {
333                removeAckByteSequence(tx, sd, subscriptionKey, prev);
334            }
335        }
336
337    }
338    
339    public HashSet<Integer> getJournalFilesBeingReplicated() {
340                return journalFilesBeingReplicated;
341        }
342
343    // /////////////////////////////////////////////////////////////////
344    // StoredDestination related implementation methods.
345    // /////////////////////////////////////////////////////////////////
346
347
348        private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
349
350    class StoredSubscription {
351        SubscriptionInfo subscriptionInfo;
352        String lastAckId;
353        ByteSequence lastAckByteSequence;
354        ByteSequence cursor;
355    }
356    
357    static class MessageRecord {
358        final String messageId;
359        final ByteSequence data;
360        
361        public MessageRecord(String messageId, ByteSequence location) {
362            this.messageId=messageId;
363            this.data=location;
364        }
365        
366        @Override
367        public String toString() {
368            return "["+messageId+","+data+"]";
369        }
370    }
371    
372    static protected class MessageKeysMarshaller extends VariableMarshaller<MessageRecord> {
373        static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
374        
375        public MessageRecord readPayload(DataInput dataIn) throws IOException {
376            return new MessageRecord(dataIn.readUTF(), ByteSequenceMarshaller.INSTANCE.readPayload(dataIn));
377        }
378
379        public void writePayload(MessageRecord object, DataOutput dataOut) throws IOException {
380            dataOut.writeUTF(object.messageId);
381            ByteSequenceMarshaller.INSTANCE.writePayload(object.data, dataOut);
382        }
383    }
384    
385    static class StoredDestination {
386        long nextMessageId;
387        BTreeIndex<Long, MessageRecord> orderIndex;
388        BTreeIndex<String, Long> messageIdIndex;
389
390        // These bits are only set for Topics
391        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
392        BTreeIndex<String, Long> subscriptionAcks;
393        HashMap<String, Long> subscriptionCursors;
394        TreeMap<Long, HashSet<String>> ackPositions;
395    }
396
397    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
398        public Class<StoredDestination> getType() {
399            return StoredDestination.class;
400        }
401
402        public StoredDestination readPayload(DataInput dataIn) throws IOException {
403            StoredDestination value = new StoredDestination();
404            value.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, dataIn.readLong());
405            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
406
407            if (dataIn.readBoolean()) {
408                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
409                value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
410            }
411            return value;
412        }
413
414        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
415            dataOut.writeLong(value.orderIndex.getPageId());
416            dataOut.writeLong(value.messageIdIndex.getPageId());
417            if (value.subscriptions != null) {
418                dataOut.writeBoolean(true);
419                dataOut.writeLong(value.subscriptions.getPageId());
420                dataOut.writeLong(value.subscriptionAcks.getPageId());
421            } else {
422                dataOut.writeBoolean(false);
423            }
424        }
425    }
426
427    static class ByteSequenceMarshaller extends VariableMarshaller<ByteSequence> {
428        final static ByteSequenceMarshaller INSTANCE = new ByteSequenceMarshaller();
429
430        public ByteSequence readPayload(DataInput dataIn) throws IOException {
431                byte data[] = new byte[dataIn.readInt()];
432                dataIn.readFully(data);
433            return new ByteSequence(data);
434        }
435
436        public void writePayload(ByteSequence object, DataOutput dataOut) throws IOException {
437            dataOut.writeInt(object.getLength());
438            dataOut.write(object.getData(), object.getOffset(), object.getLength());
439        }
440    }
441
442    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
443        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
444
445        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
446            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
447            rc.mergeFramed((InputStream)dataIn);
448            return rc;
449        }
450
451        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
452            object.writeFramed((OutputStream)dataOut);
453        }
454    }
455
456    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
457        String key = key(destination);
458        StoredDestination rc = storedDestinations.get(key);
459        if (rc == null) {
460            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
461            rc = loadStoredDestination(tx, key, topic);
462            // Cache it. We may want to remove/unload destinations from the
463            // cache that are not used for a while
464            // to reduce memory usage.
465            storedDestinations.put(key, rc);
466        }
467        return rc;
468    }
469
470    /**
471     * @param tx
472     * @param key
473     * @param topic
474     * @return
475     * @throws IOException
476     */
477    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
478        // Try to load the existing indexes..
479        StoredDestination rc = destinations.get(tx, key);
480        if (rc == null) {
481            // Brand new destination.. allocate indexes for it.
482            rc = new StoredDestination();
483            rc.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, tx.allocate());
484            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
485
486            if (topic) {
487                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
488                rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
489            }
490            destinations.put(tx, key, rc);
491        }
492
493        // Configure the marshalers and load.
494        rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
495        rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
496        rc.orderIndex.load(tx);
497
498        // Figure out the next key using the last entry in the destination.
499        Entry<Long, MessageRecord> lastEntry = rc.orderIndex.getLast(tx);
500        if( lastEntry!=null ) {
501            rc.nextMessageId = lastEntry.getKey()+1;
502        }
503
504        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
505        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
506        rc.messageIdIndex.load(tx);
507        
508        // If it was a topic...
509        if (topic) {
510
511            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
512            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
513            rc.subscriptions.load(tx);
514
515            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
516            rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
517            rc.subscriptionAcks.load(tx);
518
519            rc.ackPositions = new TreeMap<Long, HashSet<String>>();
520            rc.subscriptionCursors = new HashMap<String, Long>();
521
522            for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
523                Entry<String, Long> entry = iterator.next();
524                addAckByteSequence(rc, entry.getValue(), entry.getKey());
525            }
526
527        }
528        return rc;
529    }
530
531    /**
532     * @param sd
533     * @param messageSequence
534     * @param subscriptionKey
535     */
536    private void addAckByteSequence(StoredDestination sd, Long messageSequence, String subscriptionKey) {
537        HashSet<String> hs = sd.ackPositions.get(messageSequence);
538        if (hs == null) {
539            hs = new HashSet<String>();
540            sd.ackPositions.put(messageSequence, hs);
541        }
542        hs.add(subscriptionKey);
543    }
544
545    /**
546     * @param tx
547     * @param sd
548     * @param subscriptionKey
549     * @param sequenceId
550     * @throws IOException
551     */
552    private void removeAckByteSequence(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
553        // Remove the sub from the previous location set..
554        if (sequenceId != null) {
555            HashSet<String> hs = sd.ackPositions.get(sequenceId);
556            if (hs != null) {
557                hs.remove(subscriptionKey);
558                if (hs.isEmpty()) {
559                    HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
560                    sd.ackPositions.remove(sequenceId);
561
562                    // Did we just empty out the first set in the
563                    // ordered list of ack locations? Then it's time to
564                    // delete some messages.
565                    if (hs == firstSet) {
566
567                        // Find all the entries that need to get deleted.
568                        ArrayList<Entry<Long, MessageRecord>> deletes = new ArrayList<Entry<Long, MessageRecord>>();
569                        for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
570                            Entry<Long, MessageRecord> entry = iterator.next();
571                            if (entry.getKey().compareTo(sequenceId) <= 0) {
572                                // We don't do the actually delete while we are
573                                // iterating the BTree since
574                                // iterating would fail.
575                                deletes.add(entry);
576                            }
577                        }
578
579                        // Do the actual deletes.
580                        for (Entry<Long, MessageRecord> entry : deletes) {
581                            sd.messageIdIndex.remove(tx,entry.getValue().messageId);
582                            sd.orderIndex.remove(tx,entry.getKey());
583                        }
584                    }
585                }
586            }
587        }
588    }
589
590    private String key(KahaDestination destination) {
591        return destination.getType().getNumber() + ":" + destination.getName();
592    }
593
594    // /////////////////////////////////////////////////////////////////
595    // Transaction related implementation methods.
596    // /////////////////////////////////////////////////////////////////
597    protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
598    protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
599 
600    private ArrayList<Operation> getInflightTx(TransactionId key) {
601        ArrayList<Operation> tx = inflightTransactions.get(key);
602        if (tx == null) {
603            tx = new ArrayList<Operation>();
604            inflightTransactions.put(key, tx);
605        }
606        return tx;
607    }
608
609    abstract class Operation {
610        abstract public void execute(Transaction tx) throws IOException;
611    }
612
613    class AddOpperation extends Operation {
614        final KahaAddMessageCommand command;
615                private final ByteSequence data;
616
617        public AddOpperation(KahaAddMessageCommand command, ByteSequence location) {
618            this.command = command;
619                        this.data = location;
620        }
621
622        public void execute(Transaction tx) throws IOException {
623            upadateIndex(tx, command, data);
624        }
625
626        public KahaAddMessageCommand getCommand() {
627            return command;
628        }
629    }
630
631    class RemoveOpperation extends Operation {
632        final KahaRemoveMessageCommand command;
633
634        public RemoveOpperation(KahaRemoveMessageCommand command) {
635            this.command = command;
636        }
637
638        public void execute(Transaction tx) throws IOException {
639            updateIndex(tx, command);
640        }
641
642        public KahaRemoveMessageCommand getCommand() {
643            return command;
644        }
645    }
646
647    // /////////////////////////////////////////////////////////////////
648    // Initialization related implementation methods.
649    // /////////////////////////////////////////////////////////////////
650
651    private PageFile createPageFile() {
652        PageFile index = new PageFile(directory, "temp-db");
653        index.setEnableWriteThread(isEnableIndexWriteAsync());
654        index.setWriteBatchSize(getIndexWriteBatchSize());
655        index.setEnableDiskSyncs(false);
656        index.setEnableRecoveryFile(false);
657        return index;
658    }
659
660    public File getDirectory() {
661        return directory;
662    }
663
664    public void setDirectory(File directory) {
665        this.directory = directory;
666    }
667    
668    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
669        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
670    }
671
672    public int getIndexWriteBatchSize() {
673        return setIndexWriteBatchSize;
674    }
675    
676    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
677        this.enableIndexWriteAsync = enableIndexWriteAsync;
678    }
679    
680    boolean isEnableIndexWriteAsync() {
681        return enableIndexWriteAsync;
682    }
683        
684    public PageFile getPageFile() {
685        if (pageFile == null) {
686            pageFile = createPageFile();
687        }
688                return pageFile;
689        }
690
691}