001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.File;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.OutputStream;
025    import java.util.ArrayList;
026    import java.util.HashMap;
027    import java.util.HashSet;
028    import java.util.Iterator;
029    import java.util.LinkedHashMap;
030    import java.util.TreeMap;
031    import java.util.Map.Entry;
032    import java.util.concurrent.atomic.AtomicBoolean;
033    
034    import org.apache.activemq.command.SubscriptionInfo;
035    import org.apache.activemq.command.TransactionId;
036    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
037    import org.apache.activemq.store.kahadb.data.KahaDestination;
038    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
039    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
040    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
041    import org.apache.activemq.util.ByteSequence;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    import org.apache.kahadb.index.BTreeIndex;
045    import org.apache.kahadb.page.PageFile;
046    import org.apache.kahadb.page.Transaction;
047    import org.apache.kahadb.util.LongMarshaller;
048    import org.apache.kahadb.util.Marshaller;
049    import org.apache.kahadb.util.StringMarshaller;
050    import org.apache.kahadb.util.VariableMarshaller;
051    
052    public class TempMessageDatabase {
053    
054        private static final Logger LOG = LoggerFactory.getLogger(TempMessageDatabase.class);
055    
056        public static final int CLOSED_STATE = 1;
057        public static final int OPEN_STATE = 2;
058    
059        protected BTreeIndex<String, StoredDestination> destinations;
060        protected PageFile pageFile;
061    
062        protected File directory;
063        
064        boolean enableIndexWriteAsync = true;
065        int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 
066        
067        protected AtomicBoolean started = new AtomicBoolean();
068        protected AtomicBoolean opened = new AtomicBoolean();
069    
070        public TempMessageDatabase() {
071        }
072    
073        public void start() throws Exception {
074            if (started.compareAndSet(false, true)) {
075                    load();
076            }
077        }
078    
079        public void stop() throws Exception {
080            if (started.compareAndSet(true, false)) {
081                unload();
082            }
083        }
084    
085            private void loadPageFile() throws IOException {
086                    synchronized (indexMutex) {
087                        final PageFile pageFile = getPageFile();
088                pageFile.load();
089                pageFile.tx().execute(new Transaction.Closure<IOException>() {
090                    public void execute(Transaction tx) throws IOException {
091                        destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
092                        destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
093                        destinations.setValueMarshaller(new StoredDestinationMarshaller());
094                        destinations.load(tx);
095                    }
096                });
097                pageFile.flush();
098                storedDestinations.clear();
099            }
100            }
101            
102            /**
103             * @throws IOException
104             */
105            public void open() throws IOException {
106                    if( opened.compareAndSet(false, true) ) {
107                    loadPageFile();
108                    }
109            }
110            
111        public void load() throws IOException {
112            synchronized (indexMutex) {
113                    open();
114                pageFile.unload();
115                pageFile.delete();
116                loadPageFile();
117            }
118        }
119    
120        
121            public void close() throws IOException, InterruptedException {
122                    if( opened.compareAndSet(true, false)) {
123                    synchronized (indexMutex) {
124                        pageFile.unload();
125                    }
126                    }
127            }
128            
129        public void unload() throws IOException, InterruptedException {
130            synchronized (indexMutex) {
131                if( pageFile.isLoaded() ) {
132                    close();
133                }
134            }
135        }
136    
137        public void processAdd(final KahaAddMessageCommand command, TransactionId txid, final ByteSequence data) throws IOException {
138            if (txid!=null) {
139                synchronized (indexMutex) {
140                    ArrayList<Operation> inflightTx = getInflightTx(txid);
141                    inflightTx.add(new AddOpperation(command, data));
142                }
143            } else {
144                synchronized (indexMutex) {
145                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
146                        public void execute(Transaction tx) throws IOException {
147                            upadateIndex(tx, command, data);
148                        }
149                    });
150                }
151            }
152        }
153    
154        public void processRemove(final KahaRemoveMessageCommand command, TransactionId txid) throws IOException {
155            if (txid!=null) {
156                synchronized (indexMutex) {
157                    ArrayList<Operation> inflightTx = getInflightTx(txid);
158                    inflightTx.add(new RemoveOpperation(command));
159                }
160            } else {
161                synchronized (indexMutex) {
162                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
163                        public void execute(Transaction tx) throws IOException {
164                            updateIndex(tx, command);
165                        }
166                    });
167                }
168            }
169    
170        }
171    
172        public void process(final KahaRemoveDestinationCommand command) throws IOException {
173            synchronized (indexMutex) {
174                pageFile.tx().execute(new Transaction.Closure<IOException>() {
175                    public void execute(Transaction tx) throws IOException {
176                        updateIndex(tx, command);
177                    }
178                });
179            }
180        }
181    
182        public void process(final KahaSubscriptionCommand command) throws IOException {
183            synchronized (indexMutex) {
184                pageFile.tx().execute(new Transaction.Closure<IOException>() {
185                    public void execute(Transaction tx) throws IOException {
186                        updateIndex(tx, command);
187                    }
188                });
189            }
190        }
191    
192        public void processCommit(TransactionId key) throws IOException {
193            synchronized (indexMutex) {
194                ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
195                if (inflightTx == null) {
196                    inflightTx = preparedTransactions.remove(key);
197                }
198                if (inflightTx == null) {
199                    return;
200                }
201    
202                final ArrayList<Operation> messagingTx = inflightTx;
203                pageFile.tx().execute(new Transaction.Closure<IOException>() {
204                    public void execute(Transaction tx) throws IOException {
205                        for (Operation op : messagingTx) {
206                            op.execute(tx);
207                        }
208                    }
209                });
210            }
211        }
212    
213        public void processPrepare(TransactionId key) {
214            synchronized (indexMutex) {
215                ArrayList<Operation> tx = inflightTransactions.remove(key);
216                if (tx != null) {
217                    preparedTransactions.put(key, tx);
218                }
219            }
220        }
221    
222        public void processRollback(TransactionId key) {
223            synchronized (indexMutex) {
224                ArrayList<Operation> tx = inflightTransactions.remove(key);
225                if (tx == null) {
226                    preparedTransactions.remove(key);
227                }
228            }
229        }
230    
231        // /////////////////////////////////////////////////////////////////
232        // These methods do the actual index updates.
233        // /////////////////////////////////////////////////////////////////
234    
235        protected final Object indexMutex = new Object();
236            private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
237    
238        private void upadateIndex(Transaction tx, KahaAddMessageCommand command, ByteSequence data) throws IOException {
239            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
240    
241            // Skip adding the message to the index if this is a topic and there are
242            // no subscriptions.
243            if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
244                return;
245            }
246    
247            // Add the message.
248            long id = sd.nextMessageId++;
249            Long previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
250            if( previous == null ) {
251                sd.orderIndex.put(tx, id, new MessageRecord(command.getMessageId(), data));
252            } else {
253                // restore the previous value.. Looks like this was a redo of a previously
254                // added message.  We don't want to assing it a new id as the other indexes would 
255                // be wrong..
256                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
257            }
258        }
259    
260        private void updateIndex(Transaction tx, KahaRemoveMessageCommand command) throws IOException {
261            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
262            if (!command.hasSubscriptionKey()) {
263                
264                // In the queue case we just remove the message from the index..
265                Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
266                if (sequenceId != null) {
267                    sd.orderIndex.remove(tx, sequenceId);
268                }
269            } else {
270                // In the topic case we need remove the message once it's been acked
271                // by all the subs
272                Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
273    
274                // Make sure it's a valid message id...
275                if (sequence != null) {
276                    String subscriptionKey = command.getSubscriptionKey();
277                    Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
278    
279                    // The following method handles deleting un-referenced messages.
280                    removeAckByteSequence(tx, sd, subscriptionKey, prev);
281    
282                    // Add it to the new location set.
283                    addAckByteSequence(sd, sequence, subscriptionKey);
284                }
285    
286            }
287        }
288    
289        private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command) throws IOException {
290            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
291            sd.orderIndex.clear(tx);
292            sd.orderIndex.unload(tx);
293            tx.free(sd.orderIndex.getPageId());
294            
295            sd.messageIdIndex.clear(tx);
296            sd.messageIdIndex.unload(tx);
297            tx.free(sd.messageIdIndex.getPageId());
298    
299            if (sd.subscriptions != null) {
300                sd.subscriptions.clear(tx);
301                sd.subscriptions.unload(tx);
302                tx.free(sd.subscriptions.getPageId());
303    
304                sd.subscriptionAcks.clear(tx);
305                sd.subscriptionAcks.unload(tx);
306                tx.free(sd.subscriptionAcks.getPageId());
307            }
308    
309            String key = key(command.getDestination());
310            storedDestinations.remove(key);
311            destinations.remove(tx, key);
312        }
313    
314        private void updateIndex(Transaction tx, KahaSubscriptionCommand command) throws IOException {
315            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
316    
317            // If set then we are creating it.. otherwise we are destroying the sub
318            if (command.hasSubscriptionInfo()) {
319                String subscriptionKey = command.getSubscriptionKey();
320                sd.subscriptions.put(tx, subscriptionKey, command);
321                long ackByteSequence=-1;
322                if (!command.getRetroactive()) {
323                    ackByteSequence = sd.nextMessageId-1;
324                }
325    
326                sd.subscriptionAcks.put(tx, subscriptionKey, ackByteSequence);
327                addAckByteSequence(sd, ackByteSequence, subscriptionKey);
328            } else {
329                // delete the sub...
330                String subscriptionKey = command.getSubscriptionKey();
331                sd.subscriptions.remove(tx, subscriptionKey);
332                Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
333                if( prev!=null ) {
334                    removeAckByteSequence(tx, sd, subscriptionKey, prev);
335                }
336            }
337    
338        }
339        
340        public HashSet<Integer> getJournalFilesBeingReplicated() {
341                    return journalFilesBeingReplicated;
342            }
343    
344        // /////////////////////////////////////////////////////////////////
345        // StoredDestination related implementation methods.
346        // /////////////////////////////////////////////////////////////////
347    
348    
349            private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
350    
351        class StoredSubscription {
352            SubscriptionInfo subscriptionInfo;
353            String lastAckId;
354            ByteSequence lastAckByteSequence;
355            ByteSequence cursor;
356        }
357        
358        static class MessageRecord {
359            final String messageId;
360            final ByteSequence data;
361            
362            public MessageRecord(String messageId, ByteSequence location) {
363                this.messageId=messageId;
364                this.data=location;
365            }
366            
367            @Override
368            public String toString() {
369                return "["+messageId+","+data+"]";
370            }
371        }
372        
373        static protected class MessageKeysMarshaller extends VariableMarshaller<MessageRecord> {
374            static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
375            
376            public MessageRecord readPayload(DataInput dataIn) throws IOException {
377                return new MessageRecord(dataIn.readUTF(), ByteSequenceMarshaller.INSTANCE.readPayload(dataIn));
378            }
379    
380            public void writePayload(MessageRecord object, DataOutput dataOut) throws IOException {
381                dataOut.writeUTF(object.messageId);
382                ByteSequenceMarshaller.INSTANCE.writePayload(object.data, dataOut);
383            }
384        }
385        
386        static class StoredDestination {
387            long nextMessageId;
388            BTreeIndex<Long, MessageRecord> orderIndex;
389            BTreeIndex<String, Long> messageIdIndex;
390    
391            // These bits are only set for Topics
392            BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
393            BTreeIndex<String, Long> subscriptionAcks;
394            HashMap<String, Long> subscriptionCursors;
395            TreeMap<Long, HashSet<String>> ackPositions;
396        }
397    
398        protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
399            public Class<StoredDestination> getType() {
400                return StoredDestination.class;
401            }
402    
403            public StoredDestination readPayload(DataInput dataIn) throws IOException {
404                StoredDestination value = new StoredDestination();
405                value.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, dataIn.readLong());
406                value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
407    
408                if (dataIn.readBoolean()) {
409                    value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
410                    value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
411                }
412                return value;
413            }
414    
415            public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
416                dataOut.writeLong(value.orderIndex.getPageId());
417                dataOut.writeLong(value.messageIdIndex.getPageId());
418                if (value.subscriptions != null) {
419                    dataOut.writeBoolean(true);
420                    dataOut.writeLong(value.subscriptions.getPageId());
421                    dataOut.writeLong(value.subscriptionAcks.getPageId());
422                } else {
423                    dataOut.writeBoolean(false);
424                }
425            }
426        }
427    
428        static class ByteSequenceMarshaller extends VariableMarshaller<ByteSequence> {
429            final static ByteSequenceMarshaller INSTANCE = new ByteSequenceMarshaller();
430    
431            public ByteSequence readPayload(DataInput dataIn) throws IOException {
432                    byte data[] = new byte[dataIn.readInt()];
433                    dataIn.readFully(data);
434                return new ByteSequence(data);
435            }
436    
437            public void writePayload(ByteSequence object, DataOutput dataOut) throws IOException {
438                dataOut.writeInt(object.getLength());
439                dataOut.write(object.getData(), object.getOffset(), object.getLength());
440            }
441        }
442    
443        static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
444            final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
445    
446            public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
447                KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
448                rc.mergeFramed((InputStream)dataIn);
449                return rc;
450            }
451    
452            public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
453                object.writeFramed((OutputStream)dataOut);
454            }
455        }
456    
457        protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
458            String key = key(destination);
459            StoredDestination rc = storedDestinations.get(key);
460            if (rc == null) {
461                boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
462                rc = loadStoredDestination(tx, key, topic);
463                // Cache it. We may want to remove/unload destinations from the
464                // cache that are not used for a while
465                // to reduce memory usage.
466                storedDestinations.put(key, rc);
467            }
468            return rc;
469        }
470    
471        /**
472         * @param tx
473         * @param key
474         * @param topic
475         * @return
476         * @throws IOException
477         */
478        private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
479            // Try to load the existing indexes..
480            StoredDestination rc = destinations.get(tx, key);
481            if (rc == null) {
482                // Brand new destination.. allocate indexes for it.
483                rc = new StoredDestination();
484                rc.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, tx.allocate());
485                rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
486    
487                if (topic) {
488                    rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
489                    rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
490                }
491                destinations.put(tx, key, rc);
492            }
493    
494            // Configure the marshalers and load.
495            rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
496            rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
497            rc.orderIndex.load(tx);
498    
499            // Figure out the next key using the last entry in the destination.
500            Entry<Long, MessageRecord> lastEntry = rc.orderIndex.getLast(tx);
501            if( lastEntry!=null ) {
502                rc.nextMessageId = lastEntry.getKey()+1;
503            }
504    
505            rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
506            rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
507            rc.messageIdIndex.load(tx);
508            
509            // If it was a topic...
510            if (topic) {
511    
512                rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
513                rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
514                rc.subscriptions.load(tx);
515    
516                rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
517                rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
518                rc.subscriptionAcks.load(tx);
519    
520                rc.ackPositions = new TreeMap<Long, HashSet<String>>();
521                rc.subscriptionCursors = new HashMap<String, Long>();
522    
523                for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
524                    Entry<String, Long> entry = iterator.next();
525                    addAckByteSequence(rc, entry.getValue(), entry.getKey());
526                }
527    
528            }
529            return rc;
530        }
531    
532        /**
533         * @param sd
534         * @param messageSequence
535         * @param subscriptionKey
536         */
537        private void addAckByteSequence(StoredDestination sd, Long messageSequence, String subscriptionKey) {
538            HashSet<String> hs = sd.ackPositions.get(messageSequence);
539            if (hs == null) {
540                hs = new HashSet<String>();
541                sd.ackPositions.put(messageSequence, hs);
542            }
543            hs.add(subscriptionKey);
544        }
545    
546        /**
547         * @param tx
548         * @param sd
549         * @param subscriptionKey
550         * @param sequenceId
551         * @throws IOException
552         */
553        private void removeAckByteSequence(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
554            // Remove the sub from the previous location set..
555            if (sequenceId != null) {
556                HashSet<String> hs = sd.ackPositions.get(sequenceId);
557                if (hs != null) {
558                    hs.remove(subscriptionKey);
559                    if (hs.isEmpty()) {
560                        HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
561                        sd.ackPositions.remove(sequenceId);
562    
563                        // Did we just empty out the first set in the
564                        // ordered list of ack locations? Then it's time to
565                        // delete some messages.
566                        if (hs == firstSet) {
567    
568                            // Find all the entries that need to get deleted.
569                            ArrayList<Entry<Long, MessageRecord>> deletes = new ArrayList<Entry<Long, MessageRecord>>();
570                            for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
571                                Entry<Long, MessageRecord> entry = iterator.next();
572                                if (entry.getKey().compareTo(sequenceId) <= 0) {
573                                    // We don't do the actually delete while we are
574                                    // iterating the BTree since
575                                    // iterating would fail.
576                                    deletes.add(entry);
577                                }
578                            }
579    
580                            // Do the actual deletes.
581                            for (Entry<Long, MessageRecord> entry : deletes) {
582                                sd.messageIdIndex.remove(tx,entry.getValue().messageId);
583                                sd.orderIndex.remove(tx,entry.getKey());
584                            }
585                        }
586                    }
587                }
588            }
589        }
590    
591        private String key(KahaDestination destination) {
592            return destination.getType().getNumber() + ":" + destination.getName();
593        }
594    
595        // /////////////////////////////////////////////////////////////////
596        // Transaction related implementation methods.
597        // /////////////////////////////////////////////////////////////////
598        protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
599        protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
600     
601        private ArrayList<Operation> getInflightTx(TransactionId key) {
602            ArrayList<Operation> tx = inflightTransactions.get(key);
603            if (tx == null) {
604                tx = new ArrayList<Operation>();
605                inflightTransactions.put(key, tx);
606            }
607            return tx;
608        }
609    
610        abstract class Operation {
611            abstract public void execute(Transaction tx) throws IOException;
612        }
613    
614        class AddOpperation extends Operation {
615            final KahaAddMessageCommand command;
616                    private final ByteSequence data;
617    
618            public AddOpperation(KahaAddMessageCommand command, ByteSequence location) {
619                this.command = command;
620                            this.data = location;
621            }
622    
623            public void execute(Transaction tx) throws IOException {
624                upadateIndex(tx, command, data);
625            }
626    
627            public KahaAddMessageCommand getCommand() {
628                return command;
629            }
630        }
631    
632        class RemoveOpperation extends Operation {
633            final KahaRemoveMessageCommand command;
634    
635            public RemoveOpperation(KahaRemoveMessageCommand command) {
636                this.command = command;
637            }
638    
639            public void execute(Transaction tx) throws IOException {
640                updateIndex(tx, command);
641            }
642    
643            public KahaRemoveMessageCommand getCommand() {
644                return command;
645            }
646        }
647    
648        // /////////////////////////////////////////////////////////////////
649        // Initialization related implementation methods.
650        // /////////////////////////////////////////////////////////////////
651    
652        private PageFile createPageFile() {
653            PageFile index = new PageFile(directory, "temp-db");
654            index.setEnableWriteThread(isEnableIndexWriteAsync());
655            index.setWriteBatchSize(getIndexWriteBatchSize());
656            index.setEnableDiskSyncs(false);
657            index.setEnableRecoveryFile(false);
658            return index;
659        }
660    
661        public File getDirectory() {
662            return directory;
663        }
664    
665        public void setDirectory(File directory) {
666            this.directory = directory;
667        }
668        
669        public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
670            this.setIndexWriteBatchSize = setIndexWriteBatchSize;
671        }
672    
673        public int getIndexWriteBatchSize() {
674            return setIndexWriteBatchSize;
675        }
676        
677        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
678            this.enableIndexWriteAsync = enableIndexWriteAsync;
679        }
680        
681        boolean isEnableIndexWriteAsync() {
682            return enableIndexWriteAsync;
683        }
684            
685        public PageFile getPageFile() {
686            if (pageFile == null) {
687                pageFile = createPageFile();
688            }
689                    return pageFile;
690            }
691    
692    }