001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.File;
022    import java.io.IOException;
023    import java.io.InputStream;
024    import java.io.OutputStream;
025    import java.util.ArrayList;
026    import java.util.HashMap;
027    import java.util.HashSet;
028    import java.util.Iterator;
029    import java.util.LinkedHashMap;
030    import java.util.TreeMap;
031    import java.util.Map.Entry;
032    import java.util.concurrent.atomic.AtomicBoolean;
033    
034    import org.apache.activemq.command.SubscriptionInfo;
035    import org.apache.activemq.command.TransactionId;
036    import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
037    import org.apache.activemq.store.kahadb.data.KahaDestination;
038    import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
039    import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
040    import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
041    import org.apache.activemq.util.ByteSequence;
042    import org.slf4j.Logger;
043    import org.slf4j.LoggerFactory;
044    import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
045    import org.apache.activemq.store.kahadb.disk.page.PageFile;
046    import org.apache.activemq.store.kahadb.disk.page.Transaction;
047    import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
048    import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
049    import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
050    
051    public class TempMessageDatabase {
052    
053        private static final Logger LOG = LoggerFactory.getLogger(TempMessageDatabase.class);
054    
055        public static final int CLOSED_STATE = 1;
056        public static final int OPEN_STATE = 2;
057    
058        protected BTreeIndex<String, StoredDestination> destinations;
059        protected PageFile pageFile;
060    
061        protected File directory;
062        
063        boolean enableIndexWriteAsync = true;
064        int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 
065        
066        protected AtomicBoolean started = new AtomicBoolean();
067        protected AtomicBoolean opened = new AtomicBoolean();
068    
069        public TempMessageDatabase() {
070        }
071    
072        public void start() throws Exception {
073            if (started.compareAndSet(false, true)) {
074                    load();
075            }
076        }
077    
078        public void stop() throws Exception {
079            if (started.compareAndSet(true, false)) {
080                unload();
081            }
082        }
083    
084            private void loadPageFile() throws IOException {
085                    synchronized (indexMutex) {
086                        final PageFile pageFile = getPageFile();
087                pageFile.load();
088                pageFile.tx().execute(new Transaction.Closure<IOException>() {
089                    public void execute(Transaction tx) throws IOException {
090                        destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
091                        destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
092                        destinations.setValueMarshaller(new StoredDestinationMarshaller());
093                        destinations.load(tx);
094                    }
095                });
096                pageFile.flush();
097                storedDestinations.clear();
098            }
099            }
100            
101            /**
102             * @throws IOException
103             */
104            public void open() throws IOException {
105                    if( opened.compareAndSet(false, true) ) {
106                    loadPageFile();
107                    }
108            }
109            
110        public void load() throws IOException {
111            synchronized (indexMutex) {
112                    open();
113                pageFile.unload();
114                pageFile.delete();
115                loadPageFile();
116            }
117        }
118    
119        
120            public void close() throws IOException, InterruptedException {
121                    if( opened.compareAndSet(true, false)) {
122                    synchronized (indexMutex) {
123                        pageFile.unload();
124                    }
125                    }
126            }
127            
128        public void unload() throws IOException, InterruptedException {
129            synchronized (indexMutex) {
130                if( pageFile.isLoaded() ) {
131                    close();
132                }
133            }
134        }
135    
136        public void processAdd(final KahaAddMessageCommand command, TransactionId txid, final ByteSequence data) throws IOException {
137            if (txid!=null) {
138                synchronized (indexMutex) {
139                    ArrayList<Operation> inflightTx = getInflightTx(txid);
140                    inflightTx.add(new AddOpperation(command, data));
141                }
142            } else {
143                synchronized (indexMutex) {
144                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
145                        public void execute(Transaction tx) throws IOException {
146                            upadateIndex(tx, command, data);
147                        }
148                    });
149                }
150            }
151        }
152    
153        public void processRemove(final KahaRemoveMessageCommand command, TransactionId txid) throws IOException {
154            if (txid!=null) {
155                synchronized (indexMutex) {
156                    ArrayList<Operation> inflightTx = getInflightTx(txid);
157                    inflightTx.add(new RemoveOpperation(command));
158                }
159            } else {
160                synchronized (indexMutex) {
161                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
162                        public void execute(Transaction tx) throws IOException {
163                            updateIndex(tx, command);
164                        }
165                    });
166                }
167            }
168    
169        }
170    
171        public void process(final KahaRemoveDestinationCommand command) throws IOException {
172            synchronized (indexMutex) {
173                pageFile.tx().execute(new Transaction.Closure<IOException>() {
174                    public void execute(Transaction tx) throws IOException {
175                        updateIndex(tx, command);
176                    }
177                });
178            }
179        }
180    
181        public void process(final KahaSubscriptionCommand command) throws IOException {
182            synchronized (indexMutex) {
183                pageFile.tx().execute(new Transaction.Closure<IOException>() {
184                    public void execute(Transaction tx) throws IOException {
185                        updateIndex(tx, command);
186                    }
187                });
188            }
189        }
190    
191        public void processCommit(TransactionId key) throws IOException {
192            synchronized (indexMutex) {
193                ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
194                if (inflightTx == null) {
195                    inflightTx = preparedTransactions.remove(key);
196                }
197                if (inflightTx == null) {
198                    return;
199                }
200    
201                final ArrayList<Operation> messagingTx = inflightTx;
202                pageFile.tx().execute(new Transaction.Closure<IOException>() {
203                    public void execute(Transaction tx) throws IOException {
204                        for (Operation op : messagingTx) {
205                            op.execute(tx);
206                        }
207                    }
208                });
209            }
210        }
211    
212        public void processPrepare(TransactionId key) {
213            synchronized (indexMutex) {
214                ArrayList<Operation> tx = inflightTransactions.remove(key);
215                if (tx != null) {
216                    preparedTransactions.put(key, tx);
217                }
218            }
219        }
220    
221        public void processRollback(TransactionId key) {
222            synchronized (indexMutex) {
223                ArrayList<Operation> tx = inflightTransactions.remove(key);
224                if (tx == null) {
225                    preparedTransactions.remove(key);
226                }
227            }
228        }
229    
230        // /////////////////////////////////////////////////////////////////
231        // These methods do the actual index updates.
232        // /////////////////////////////////////////////////////////////////
233    
234        protected final Object indexMutex = new Object();
235            private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
236    
237        private void upadateIndex(Transaction tx, KahaAddMessageCommand command, ByteSequence data) throws IOException {
238            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
239    
240            // Skip adding the message to the index if this is a topic and there are
241            // no subscriptions.
242            if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
243                return;
244            }
245    
246            // Add the message.
247            long id = sd.nextMessageId++;
248            Long previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
249            if( previous == null ) {
250                sd.orderIndex.put(tx, id, new MessageRecord(command.getMessageId(), data));
251            } else {
252                // restore the previous value.. Looks like this was a redo of a previously
253                // added message.  We don't want to assing it a new id as the other indexes would 
254                // be wrong..
255                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
256            }
257        }
258    
259        private void updateIndex(Transaction tx, KahaRemoveMessageCommand command) throws IOException {
260            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
261            if (!command.hasSubscriptionKey()) {
262                
263                // In the queue case we just remove the message from the index..
264                Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
265                if (sequenceId != null) {
266                    sd.orderIndex.remove(tx, sequenceId);
267                }
268            } else {
269                // In the topic case we need remove the message once it's been acked
270                // by all the subs
271                Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
272    
273                // Make sure it's a valid message id...
274                if (sequence != null) {
275                    String subscriptionKey = command.getSubscriptionKey();
276                    Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
277    
278                    // The following method handles deleting un-referenced messages.
279                    removeAckByteSequence(tx, sd, subscriptionKey, prev);
280    
281                    // Add it to the new location set.
282                    addAckByteSequence(sd, sequence, subscriptionKey);
283                }
284    
285            }
286        }
287    
288        private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command) throws IOException {
289            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
290            sd.orderIndex.clear(tx);
291            sd.orderIndex.unload(tx);
292            tx.free(sd.orderIndex.getPageId());
293            
294            sd.messageIdIndex.clear(tx);
295            sd.messageIdIndex.unload(tx);
296            tx.free(sd.messageIdIndex.getPageId());
297    
298            if (sd.subscriptions != null) {
299                sd.subscriptions.clear(tx);
300                sd.subscriptions.unload(tx);
301                tx.free(sd.subscriptions.getPageId());
302    
303                sd.subscriptionAcks.clear(tx);
304                sd.subscriptionAcks.unload(tx);
305                tx.free(sd.subscriptionAcks.getPageId());
306            }
307    
308            String key = key(command.getDestination());
309            storedDestinations.remove(key);
310            destinations.remove(tx, key);
311        }
312    
313        private void updateIndex(Transaction tx, KahaSubscriptionCommand command) throws IOException {
314            StoredDestination sd = getStoredDestination(command.getDestination(), tx);
315    
316            // If set then we are creating it.. otherwise we are destroying the sub
317            if (command.hasSubscriptionInfo()) {
318                String subscriptionKey = command.getSubscriptionKey();
319                sd.subscriptions.put(tx, subscriptionKey, command);
320                long ackByteSequence=-1;
321                if (!command.getRetroactive()) {
322                    ackByteSequence = sd.nextMessageId-1;
323                }
324    
325                sd.subscriptionAcks.put(tx, subscriptionKey, ackByteSequence);
326                addAckByteSequence(sd, ackByteSequence, subscriptionKey);
327            } else {
328                // delete the sub...
329                String subscriptionKey = command.getSubscriptionKey();
330                sd.subscriptions.remove(tx, subscriptionKey);
331                Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
332                if( prev!=null ) {
333                    removeAckByteSequence(tx, sd, subscriptionKey, prev);
334                }
335            }
336    
337        }
338        
339        public HashSet<Integer> getJournalFilesBeingReplicated() {
340                    return journalFilesBeingReplicated;
341            }
342    
343        // /////////////////////////////////////////////////////////////////
344        // StoredDestination related implementation methods.
345        // /////////////////////////////////////////////////////////////////
346    
347    
348            private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
349    
350        class StoredSubscription {
351            SubscriptionInfo subscriptionInfo;
352            String lastAckId;
353            ByteSequence lastAckByteSequence;
354            ByteSequence cursor;
355        }
356        
357        static class MessageRecord {
358            final String messageId;
359            final ByteSequence data;
360            
361            public MessageRecord(String messageId, ByteSequence location) {
362                this.messageId=messageId;
363                this.data=location;
364            }
365            
366            @Override
367            public String toString() {
368                return "["+messageId+","+data+"]";
369            }
370        }
371        
372        static protected class MessageKeysMarshaller extends VariableMarshaller<MessageRecord> {
373            static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
374            
375            public MessageRecord readPayload(DataInput dataIn) throws IOException {
376                return new MessageRecord(dataIn.readUTF(), ByteSequenceMarshaller.INSTANCE.readPayload(dataIn));
377            }
378    
379            public void writePayload(MessageRecord object, DataOutput dataOut) throws IOException {
380                dataOut.writeUTF(object.messageId);
381                ByteSequenceMarshaller.INSTANCE.writePayload(object.data, dataOut);
382            }
383        }
384        
385        static class StoredDestination {
386            long nextMessageId;
387            BTreeIndex<Long, MessageRecord> orderIndex;
388            BTreeIndex<String, Long> messageIdIndex;
389    
390            // These bits are only set for Topics
391            BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
392            BTreeIndex<String, Long> subscriptionAcks;
393            HashMap<String, Long> subscriptionCursors;
394            TreeMap<Long, HashSet<String>> ackPositions;
395        }
396    
397        protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
398            public Class<StoredDestination> getType() {
399                return StoredDestination.class;
400            }
401    
402            public StoredDestination readPayload(DataInput dataIn) throws IOException {
403                StoredDestination value = new StoredDestination();
404                value.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, dataIn.readLong());
405                value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
406    
407                if (dataIn.readBoolean()) {
408                    value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
409                    value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
410                }
411                return value;
412            }
413    
414            public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
415                dataOut.writeLong(value.orderIndex.getPageId());
416                dataOut.writeLong(value.messageIdIndex.getPageId());
417                if (value.subscriptions != null) {
418                    dataOut.writeBoolean(true);
419                    dataOut.writeLong(value.subscriptions.getPageId());
420                    dataOut.writeLong(value.subscriptionAcks.getPageId());
421                } else {
422                    dataOut.writeBoolean(false);
423                }
424            }
425        }
426    
427        static class ByteSequenceMarshaller extends VariableMarshaller<ByteSequence> {
428            final static ByteSequenceMarshaller INSTANCE = new ByteSequenceMarshaller();
429    
430            public ByteSequence readPayload(DataInput dataIn) throws IOException {
431                    byte data[] = new byte[dataIn.readInt()];
432                    dataIn.readFully(data);
433                return new ByteSequence(data);
434            }
435    
436            public void writePayload(ByteSequence object, DataOutput dataOut) throws IOException {
437                dataOut.writeInt(object.getLength());
438                dataOut.write(object.getData(), object.getOffset(), object.getLength());
439            }
440        }
441    
442        static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
443            final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
444    
445            public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
446                KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
447                rc.mergeFramed((InputStream)dataIn);
448                return rc;
449            }
450    
451            public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
452                object.writeFramed((OutputStream)dataOut);
453            }
454        }
455    
456        protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
457            String key = key(destination);
458            StoredDestination rc = storedDestinations.get(key);
459            if (rc == null) {
460                boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
461                rc = loadStoredDestination(tx, key, topic);
462                // Cache it. We may want to remove/unload destinations from the
463                // cache that are not used for a while
464                // to reduce memory usage.
465                storedDestinations.put(key, rc);
466            }
467            return rc;
468        }
469    
470        /**
471         * @param tx
472         * @param key
473         * @param topic
474         * @return
475         * @throws IOException
476         */
477        private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
478            // Try to load the existing indexes..
479            StoredDestination rc = destinations.get(tx, key);
480            if (rc == null) {
481                // Brand new destination.. allocate indexes for it.
482                rc = new StoredDestination();
483                rc.orderIndex = new BTreeIndex<Long, MessageRecord>(pageFile, tx.allocate());
484                rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
485    
486                if (topic) {
487                    rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
488                    rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
489                }
490                destinations.put(tx, key, rc);
491            }
492    
493            // Configure the marshalers and load.
494            rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
495            rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
496            rc.orderIndex.load(tx);
497    
498            // Figure out the next key using the last entry in the destination.
499            Entry<Long, MessageRecord> lastEntry = rc.orderIndex.getLast(tx);
500            if( lastEntry!=null ) {
501                rc.nextMessageId = lastEntry.getKey()+1;
502            }
503    
504            rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
505            rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
506            rc.messageIdIndex.load(tx);
507            
508            // If it was a topic...
509            if (topic) {
510    
511                rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
512                rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
513                rc.subscriptions.load(tx);
514    
515                rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
516                rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
517                rc.subscriptionAcks.load(tx);
518    
519                rc.ackPositions = new TreeMap<Long, HashSet<String>>();
520                rc.subscriptionCursors = new HashMap<String, Long>();
521    
522                for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
523                    Entry<String, Long> entry = iterator.next();
524                    addAckByteSequence(rc, entry.getValue(), entry.getKey());
525                }
526    
527            }
528            return rc;
529        }
530    
531        /**
532         * @param sd
533         * @param messageSequence
534         * @param subscriptionKey
535         */
536        private void addAckByteSequence(StoredDestination sd, Long messageSequence, String subscriptionKey) {
537            HashSet<String> hs = sd.ackPositions.get(messageSequence);
538            if (hs == null) {
539                hs = new HashSet<String>();
540                sd.ackPositions.put(messageSequence, hs);
541            }
542            hs.add(subscriptionKey);
543        }
544    
545        /**
546         * @param tx
547         * @param sd
548         * @param subscriptionKey
549         * @param sequenceId
550         * @throws IOException
551         */
552        private void removeAckByteSequence(Transaction tx, StoredDestination sd, String subscriptionKey, Long sequenceId) throws IOException {
553            // Remove the sub from the previous location set..
554            if (sequenceId != null) {
555                HashSet<String> hs = sd.ackPositions.get(sequenceId);
556                if (hs != null) {
557                    hs.remove(subscriptionKey);
558                    if (hs.isEmpty()) {
559                        HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
560                        sd.ackPositions.remove(sequenceId);
561    
562                        // Did we just empty out the first set in the
563                        // ordered list of ack locations? Then it's time to
564                        // delete some messages.
565                        if (hs == firstSet) {
566    
567                            // Find all the entries that need to get deleted.
568                            ArrayList<Entry<Long, MessageRecord>> deletes = new ArrayList<Entry<Long, MessageRecord>>();
569                            for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
570                                Entry<Long, MessageRecord> entry = iterator.next();
571                                if (entry.getKey().compareTo(sequenceId) <= 0) {
572                                    // We don't do the actually delete while we are
573                                    // iterating the BTree since
574                                    // iterating would fail.
575                                    deletes.add(entry);
576                                }
577                            }
578    
579                            // Do the actual deletes.
580                            for (Entry<Long, MessageRecord> entry : deletes) {
581                                sd.messageIdIndex.remove(tx,entry.getValue().messageId);
582                                sd.orderIndex.remove(tx,entry.getKey());
583                            }
584                        }
585                    }
586                }
587            }
588        }
589    
590        private String key(KahaDestination destination) {
591            return destination.getType().getNumber() + ":" + destination.getName();
592        }
593    
594        // /////////////////////////////////////////////////////////////////
595        // Transaction related implementation methods.
596        // /////////////////////////////////////////////////////////////////
597        protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
598        protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
599     
600        private ArrayList<Operation> getInflightTx(TransactionId key) {
601            ArrayList<Operation> tx = inflightTransactions.get(key);
602            if (tx == null) {
603                tx = new ArrayList<Operation>();
604                inflightTransactions.put(key, tx);
605            }
606            return tx;
607        }
608    
609        abstract class Operation {
610            abstract public void execute(Transaction tx) throws IOException;
611        }
612    
613        class AddOpperation extends Operation {
614            final KahaAddMessageCommand command;
615                    private final ByteSequence data;
616    
617            public AddOpperation(KahaAddMessageCommand command, ByteSequence location) {
618                this.command = command;
619                            this.data = location;
620            }
621    
622            public void execute(Transaction tx) throws IOException {
623                upadateIndex(tx, command, data);
624            }
625    
626            public KahaAddMessageCommand getCommand() {
627                return command;
628            }
629        }
630    
631        class RemoveOpperation extends Operation {
632            final KahaRemoveMessageCommand command;
633    
634            public RemoveOpperation(KahaRemoveMessageCommand command) {
635                this.command = command;
636            }
637    
638            public void execute(Transaction tx) throws IOException {
639                updateIndex(tx, command);
640            }
641    
642            public KahaRemoveMessageCommand getCommand() {
643                return command;
644            }
645        }
646    
647        // /////////////////////////////////////////////////////////////////
648        // Initialization related implementation methods.
649        // /////////////////////////////////////////////////////////////////
650    
651        private PageFile createPageFile() {
652            PageFile index = new PageFile(directory, "temp-db");
653            index.setEnableWriteThread(isEnableIndexWriteAsync());
654            index.setWriteBatchSize(getIndexWriteBatchSize());
655            index.setEnableDiskSyncs(false);
656            index.setEnableRecoveryFile(false);
657            return index;
658        }
659    
660        public File getDirectory() {
661            return directory;
662        }
663    
664        public void setDirectory(File directory) {
665            this.directory = directory;
666        }
667        
668        public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
669            this.setIndexWriteBatchSize = setIndexWriteBatchSize;
670        }
671    
672        public int getIndexWriteBatchSize() {
673            return setIndexWriteBatchSize;
674        }
675        
676        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
677            this.enableIndexWriteAsync = enableIndexWriteAsync;
678        }
679        
680        boolean isEnableIndexWriteAsync() {
681            return enableIndexWriteAsync;
682        }
683            
684        public PageFile getPageFile() {
685            if (pageFile == null) {
686                pageFile = createPageFile();
687            }
688                    return pageFile;
689            }
690    
691    }