001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.plist;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.File;
022    import java.io.IOException;
023    import java.util.ArrayList;
024    import java.util.HashMap;
025    import java.util.Iterator;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Map.Entry;
029    import java.util.Set;
030    
031    import org.apache.activemq.broker.BrokerService;
032    import org.apache.activemq.broker.BrokerServiceAware;
033    import org.apache.activemq.thread.Scheduler;
034    import org.apache.activemq.util.IOHelper;
035    import org.apache.activemq.util.ServiceStopper;
036    import org.apache.activemq.util.ServiceSupport;
037    import org.apache.kahadb.index.BTreeIndex;
038    import org.apache.kahadb.journal.Journal;
039    import org.apache.kahadb.journal.Location;
040    import org.apache.kahadb.page.Page;
041    import org.apache.kahadb.page.PageFile;
042    import org.apache.kahadb.page.Transaction;
043    import org.apache.kahadb.util.ByteSequence;
044    import org.apache.kahadb.util.LockFile;
045    import org.apache.kahadb.util.StringMarshaller;
046    import org.apache.kahadb.util.VariableMarshaller;
047    import org.slf4j.Logger;
048    import org.slf4j.LoggerFactory;
049    
050    /**
051     * @org.apache.xbean.XBean
052     */
053    public class PListStore extends ServiceSupport implements BrokerServiceAware, Runnable {
054        static final Logger LOG = LoggerFactory.getLogger(PListStore.class);
055        private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
056    
057        static final int CLOSED_STATE = 1;
058        static final int OPEN_STATE = 2;
059    
060        private File directory;
061        PageFile pageFile;
062        private Journal journal;
063        private LockFile lockFile;
064        private boolean failIfDatabaseIsLocked;
065        private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
066        private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
067        private boolean enableIndexWriteAsync = false;
068        private boolean initialized = false;
069        private boolean lazyInit = true;
070        // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
071        MetaData metaData = new MetaData(this);
072        final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
073        Map<String, PList> persistentLists = new HashMap<String, PList>();
074        final Object indexLock = new Object();
075        private Scheduler scheduler;
076        private long cleanupInterval = 30000;
077    
078        private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE;
079        private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE;
080        private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
081        private boolean indexEnablePageCaching = true;
082    
083        public Object getIndexLock() {
084            return indexLock;
085        }
086    
087        @Override
088        public void setBrokerService(BrokerService brokerService) {
089            this.scheduler = brokerService.getScheduler();
090        }
091    
092        public int getIndexPageSize() {
093            return indexPageSize;
094        }
095    
096        public int getIndexCacheSize() {
097            return indexCacheSize;
098        }
099    
100        public int getIndexWriteBatchSize() {
101            return indexWriteBatchSize;
102        }
103    
104        public void setIndexPageSize(int indexPageSize) {
105            this.indexPageSize = indexPageSize;
106        }
107    
108        public void setIndexCacheSize(int indexCacheSize) {
109            this.indexCacheSize = indexCacheSize;
110        }
111    
112        public void setIndexWriteBatchSize(int indexWriteBatchSize) {
113            this.indexWriteBatchSize = indexWriteBatchSize;
114        }
115    
116        public boolean getIndexEnablePageCaching() {
117            return indexEnablePageCaching;
118        }
119    
120        public void setIndexEnablePageCaching(boolean indexEnablePageCaching) {
121            this.indexEnablePageCaching = indexEnablePageCaching;
122        }
123    
124        protected class MetaData {
125            protected MetaData(PListStore store) {
126                this.store = store;
127            }
128    
129            private final PListStore store;
130            Page<MetaData> page;
131            BTreeIndex<String, PList> lists;
132    
133            void createIndexes(Transaction tx) throws IOException {
134                this.lists = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
135            }
136    
137            void load(Transaction tx) throws IOException {
138                this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
139                this.lists.setValueMarshaller(new PListMarshaller(this.store));
140                this.lists.load(tx);
141            }
142    
143            void loadLists(Transaction tx, Map<String, PList> lists) throws IOException {
144                for (Iterator<Entry<String, PList>> i = this.lists.iterator(tx); i.hasNext();) {
145                    Entry<String, PList> entry = i.next();
146                    entry.getValue().load(tx);
147                    lists.put(entry.getKey(), entry.getValue());
148                }
149            }
150    
151            public void read(DataInput is) throws IOException {
152                this.lists = new BTreeIndex<String, PList>(pageFile, is.readLong());
153                this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
154                this.lists.setValueMarshaller(new PListMarshaller(this.store));
155            }
156    
157            public void write(DataOutput os) throws IOException {
158                os.writeLong(this.lists.getPageId());
159            }
160        }
161    
162        class MetaDataMarshaller extends VariableMarshaller<MetaData> {
163            private final PListStore store;
164    
165            MetaDataMarshaller(PListStore store) {
166                this.store = store;
167            }
168            public MetaData readPayload(DataInput dataIn) throws IOException {
169                MetaData rc = new MetaData(this.store);
170                rc.read(dataIn);
171                return rc;
172            }
173    
174            public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
175                object.write(dataOut);
176            }
177        }
178    
179        class PListMarshaller extends VariableMarshaller<PList> {
180            private final PListStore store;
181            PListMarshaller(PListStore store) {
182                this.store = store;
183            }
184            public PList readPayload(DataInput dataIn) throws IOException {
185                PList result = new PList(this.store);
186                result.read(dataIn);
187                return result;
188            }
189    
190            public void writePayload(PList list, DataOutput dataOut) throws IOException {
191                list.write(dataOut);
192            }
193        }
194    
195        public Journal getJournal() {
196            return this.journal;
197        }
198    
199        public File getDirectory() {
200            return directory;
201        }
202    
203        public void setDirectory(File directory) {
204            this.directory = directory;
205        }
206    
207        public long size() {
208            synchronized (this) {
209                if (!initialized) {
210                    return 0;
211                }
212            }
213            try {
214                return journal.getDiskSize() + pageFile.getDiskSize();
215            } catch (IOException e) {
216                throw new RuntimeException(e);
217            }
218        }
219    
220        public PList getPList(final String name) throws Exception {
221            if (!isStarted()) {
222                throw new IllegalStateException("Not started");
223            }
224            intialize();
225            synchronized (indexLock) {
226                synchronized (this) {
227                    PList result = this.persistentLists.get(name);
228                    if (result == null) {
229                        final PList pl = new PList(this);
230                        pl.setName(name);
231                        getPageFile().tx().execute(new Transaction.Closure<IOException>() {
232                            public void execute(Transaction tx) throws IOException {
233                                pl.setHeadPageId(tx.allocate().getPageId());
234                                pl.load(tx);
235                                metaData.lists.put(tx, name, pl);
236                            }
237                        });
238                        result = pl;
239                        this.persistentLists.put(name, pl);
240                    }
241                    final PList toLoad = result;
242                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
243                        public void execute(Transaction tx) throws IOException {
244                            toLoad.load(tx);
245                        }
246                    });
247    
248                    return result;
249                }
250            }
251        }
252    
253        public boolean removePList(final String name) throws Exception {
254            boolean result = false;
255            synchronized (indexLock) {
256                synchronized (this) {
257                    final PList pl = this.persistentLists.remove(name);
258                    result = pl != null;
259                    if (result) {
260                        getPageFile().tx().execute(new Transaction.Closure<IOException>() {
261                            public void execute(Transaction tx) throws IOException {
262                                metaData.lists.remove(tx, name);
263                                pl.destroy();
264                            }
265                        });
266                    }
267                }
268            }
269            return result;
270        }
271    
272        protected synchronized void intialize() throws Exception {
273            if (isStarted()) {
274                if (this.initialized == false) {
275                    if (this.directory == null) {
276                        this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
277                    }
278                    IOHelper.mkdirs(this.directory);
279                    lock();
280                    this.journal = new Journal();
281                    this.journal.setDirectory(directory);
282                    this.journal.setMaxFileLength(getJournalMaxFileLength());
283                    this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
284                    this.journal.start();
285                    this.pageFile = new PageFile(directory, "tmpDB");
286                    this.pageFile.setEnablePageCaching(getIndexEnablePageCaching());
287                    this.pageFile.setPageSize(getIndexPageSize());
288                    this.pageFile.setWriteBatchSize(getIndexWriteBatchSize());
289                    this.pageFile.setPageCacheSize(getIndexCacheSize());
290                    this.pageFile.load();
291    
292                    this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
293                        public void execute(Transaction tx) throws IOException {
294                            if (pageFile.getPageCount() == 0) {
295                                Page<MetaData> page = tx.allocate();
296                                assert page.getPageId() == 0;
297                                page.set(metaData);
298                                metaData.page = page;
299                                metaData.createIndexes(tx);
300                                tx.store(metaData.page, metaDataMarshaller, true);
301    
302                            } else {
303                                Page<MetaData> page = tx.load(0, metaDataMarshaller);
304                                metaData = page.get();
305                                metaData.page = page;
306                            }
307                            metaData.load(tx);
308                            metaData.loadLists(tx, persistentLists);
309                        }
310                    });
311                    this.pageFile.flush();
312    
313                    if (cleanupInterval > 0) {
314                        if (scheduler == null) {
315                            scheduler = new Scheduler(PListStore.class.getSimpleName());
316                            scheduler.start();
317                        }
318                        scheduler.executePeriodically(this, cleanupInterval);
319                    }
320                    this.initialized = true;
321                    LOG.info(this + " initialized");
322                }
323            }
324        }
325    
326        @Override
327        protected synchronized void doStart() throws Exception {
328            if (!lazyInit) {
329                intialize();
330            }
331            LOG.info(this + " started");
332        }
333    
334        @Override
335        protected synchronized void doStop(ServiceStopper stopper) throws Exception {
336            if (scheduler != null) {
337                if (PListStore.class.getSimpleName().equals(scheduler.getName())) {
338                    scheduler.stop();
339                    scheduler = null;
340                }
341            }
342            for (PList pl : this.persistentLists.values()) {
343                pl.unload(null);
344            }
345            if (this.pageFile != null) {
346                this.pageFile.unload();
347            }
348            if (this.journal != null) {
349                journal.close();
350            }
351            if (this.lockFile != null) {
352                this.lockFile.unlock();
353            }
354            this.lockFile = null;
355            this.initialized = false;
356            LOG.info(this + " stopped");
357    
358        }
359    
360        public void run() {
361            try {
362                if (isStopping()) {
363                    return;
364                }
365                final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId();
366                final Set<Integer> candidates = journal.getFileMap().keySet();
367                LOG.trace("Full gc candidate set:" + candidates);
368                if (candidates.size() > 1) {
369                    // prune current write
370                    for (Iterator<Integer> iterator = candidates.iterator(); iterator.hasNext();) {
371                        if (iterator.next() >= lastJournalFileId) {
372                            iterator.remove();
373                        }
374                    }
375                    List<PList> plists = null;
376                    synchronized (indexLock) {
377                        synchronized (this) {
378                            plists = new ArrayList<PList>(persistentLists.values());
379                        }
380                    }
381                    for (PList list : plists) {
382                        list.claimFileLocations(candidates);
383                        if (isStopping()) {
384                            return;
385                        }
386                        LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates);
387                    }
388                    LOG.trace("GC Candidate set:" + candidates);
389                    this.journal.removeDataFiles(candidates);
390                }
391            } catch (IOException e) {
392                LOG.error("Exception on periodic cleanup: " + e, e);
393            }
394        }
395    
396        ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
397            ByteSequence result = null;
398            result = this.journal.read(location);
399            return result;
400        }
401    
402        Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
403            return this.journal.write(payload, sync);
404        }
405    
406        private void lock() throws IOException {
407            if (lockFile == null) {
408                File lockFileName = new File(directory, "lock");
409                lockFile = new LockFile(lockFileName, true);
410                if (failIfDatabaseIsLocked) {
411                    lockFile.lock();
412                } else {
413                    while (true) {
414                        try {
415                            lockFile.lock();
416                            break;
417                        } catch (IOException e) {
418                            LOG.info("Database " + lockFileName + " is locked... waiting "
419                                    + (DATABASE_LOCKED_WAIT_DELAY / 1000)
420                                    + " seconds for the database to be unlocked. Reason: " + e);
421                            try {
422                                Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
423                            } catch (InterruptedException e1) {
424                            }
425                        }
426                    }
427                }
428            }
429        }
430    
431        PageFile getPageFile() {
432            this.pageFile.isLoaded();
433            return this.pageFile;
434        }
435    
436        public boolean isFailIfDatabaseIsLocked() {
437            return failIfDatabaseIsLocked;
438        }
439    
440        public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
441            this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
442        }
443    
444        public int getJournalMaxFileLength() {
445            return journalMaxFileLength;
446        }
447    
448        public void setJournalMaxFileLength(int journalMaxFileLength) {
449            this.journalMaxFileLength = journalMaxFileLength;
450        }
451    
452        public int getJournalMaxWriteBatchSize() {
453            return journalMaxWriteBatchSize;
454        }
455    
456        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
457            this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
458        }
459    
460        public boolean isEnableIndexWriteAsync() {
461            return enableIndexWriteAsync;
462        }
463    
464        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
465            this.enableIndexWriteAsync = enableIndexWriteAsync;
466        }
467    
468        public long getCleanupInterval() {
469            return cleanupInterval;
470        }
471    
472        public void setCleanupInterval(long cleanupInterval) {
473            this.cleanupInterval = cleanupInterval;
474        }
475    
476        public boolean isLazyInit() {
477            return lazyInit;
478        }
479    
480        public void setLazyInit(boolean lazyInit) {
481            this.lazyInit = lazyInit;
482        }
483    
484        @Override
485        public String toString() {
486            String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
487            return "PListStore:[" + path + "]";
488        }
489    }