001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.plist;
018    
019    import org.apache.activemq.broker.BrokerService;
020    import org.apache.activemq.broker.BrokerServiceAware;
021    import org.apache.activemq.openwire.OpenWireFormat;
022    import org.apache.activemq.store.JournaledStore;
023    import org.apache.activemq.store.PList;
024    import org.apache.activemq.store.PListStore;
025    import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
026    import org.apache.activemq.store.kahadb.disk.journal.Journal;
027    import org.apache.activemq.store.kahadb.disk.journal.Location;
028    import org.apache.activemq.store.kahadb.disk.page.Page;
029    import org.apache.activemq.store.kahadb.disk.page.PageFile;
030    import org.apache.activemq.store.kahadb.disk.page.Transaction;
031    import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
032    import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
033    import org.apache.activemq.thread.Scheduler;
034    import org.apache.activemq.util.*;
035    import org.apache.activemq.wireformat.WireFormat;
036    import org.slf4j.Logger;
037    import org.slf4j.LoggerFactory;
038    
039    import java.io.DataInput;
040    import java.io.DataOutput;
041    import java.io.File;
042    import java.io.IOException;
043    import java.util.*;
044    import java.util.Map.Entry;
045    
046    /**
047     * @org.apache.xbean.XBean
048     */
049    public class PListStoreImpl extends ServiceSupport implements BrokerServiceAware, Runnable, PListStore, JournaledStore {
050        static final Logger LOG = LoggerFactory.getLogger(PListStoreImpl.class);
051        private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052    
053        static final int CLOSED_STATE = 1;
054        static final int OPEN_STATE = 2;
055    
056        private File directory;
057        PageFile pageFile;
058        private Journal journal;
059        private LockFile lockFile;
060        private boolean failIfDatabaseIsLocked;
061        private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
062        private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
063        private boolean enableIndexWriteAsync = false;
064        private boolean initialized = false;
065        private boolean lazyInit = true;
066        // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
067        MetaData metaData = new MetaData(this);
068        final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
069        Map<String, PListImpl> persistentLists = new HashMap<String, PListImpl>();
070        final Object indexLock = new Object();
071        private Scheduler scheduler;
072        private long cleanupInterval = 30000;
073    
074        private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE;
075        private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE;
076        private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
077        private boolean indexEnablePageCaching = true;
078    
079        public Object getIndexLock() {
080            return indexLock;
081        }
082    
083        @Override
084        public void setBrokerService(BrokerService brokerService) {
085            this.scheduler = brokerService.getScheduler();
086        }
087    
088        public int getIndexPageSize() {
089            return indexPageSize;
090        }
091    
092        public int getIndexCacheSize() {
093            return indexCacheSize;
094        }
095    
096        public int getIndexWriteBatchSize() {
097            return indexWriteBatchSize;
098        }
099    
100        public void setIndexPageSize(int indexPageSize) {
101            this.indexPageSize = indexPageSize;
102        }
103    
104        public void setIndexCacheSize(int indexCacheSize) {
105            this.indexCacheSize = indexCacheSize;
106        }
107    
108        public void setIndexWriteBatchSize(int indexWriteBatchSize) {
109            this.indexWriteBatchSize = indexWriteBatchSize;
110        }
111    
112        public boolean getIndexEnablePageCaching() {
113            return indexEnablePageCaching;
114        }
115    
116        public void setIndexEnablePageCaching(boolean indexEnablePageCaching) {
117            this.indexEnablePageCaching = indexEnablePageCaching;
118        }
119    
120        protected class MetaData {
121            protected MetaData(PListStoreImpl store) {
122                this.store = store;
123            }
124    
125            private final PListStoreImpl store;
126            Page<MetaData> page;
127            BTreeIndex<String, PListImpl> lists;
128    
129            void createIndexes(Transaction tx) throws IOException {
130                this.lists = new BTreeIndex<String, PListImpl>(pageFile, tx.allocate().getPageId());
131            }
132    
133            void load(Transaction tx) throws IOException {
134                this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
135                this.lists.setValueMarshaller(new PListMarshaller(this.store));
136                this.lists.load(tx);
137            }
138    
139            void loadLists(Transaction tx, Map<String, PListImpl> lists) throws IOException {
140                for (Iterator<Entry<String, PListImpl>> i = this.lists.iterator(tx); i.hasNext();) {
141                    Entry<String, PListImpl> entry = i.next();
142                    entry.getValue().load(tx);
143                    lists.put(entry.getKey(), entry.getValue());
144                }
145            }
146    
147            public void read(DataInput is) throws IOException {
148                this.lists = new BTreeIndex<String, PListImpl>(pageFile, is.readLong());
149                this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
150                this.lists.setValueMarshaller(new PListMarshaller(this.store));
151            }
152    
153            public void write(DataOutput os) throws IOException {
154                os.writeLong(this.lists.getPageId());
155            }
156        }
157    
158        class MetaDataMarshaller extends VariableMarshaller<MetaData> {
159            private final PListStoreImpl store;
160    
161            MetaDataMarshaller(PListStoreImpl store) {
162                this.store = store;
163            }
164            public MetaData readPayload(DataInput dataIn) throws IOException {
165                MetaData rc = new MetaData(this.store);
166                rc.read(dataIn);
167                return rc;
168            }
169    
170            public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
171                object.write(dataOut);
172            }
173        }
174    
175        class PListMarshaller extends VariableMarshaller<PListImpl> {
176            private final PListStoreImpl store;
177            PListMarshaller(PListStoreImpl store) {
178                this.store = store;
179            }
180            public PListImpl readPayload(DataInput dataIn) throws IOException {
181                PListImpl result = new PListImpl(this.store);
182                result.read(dataIn);
183                return result;
184            }
185    
186            public void writePayload(PListImpl list, DataOutput dataOut) throws IOException {
187                list.write(dataOut);
188            }
189        }
190    
191        public Journal getJournal() {
192            return this.journal;
193        }
194    
195        @Override
196        public File getDirectory() {
197            return directory;
198        }
199    
200        @Override
201        public void setDirectory(File directory) {
202            this.directory = directory;
203        }
204    
205        public long size() {
206            synchronized (this) {
207                if (!initialized) {
208                    return 0;
209                }
210            }
211            try {
212                return journal.getDiskSize() + pageFile.getDiskSize();
213            } catch (IOException e) {
214                throw new RuntimeException(e);
215            }
216        }
217    
218        @Override
219        public PListImpl getPList(final String name) throws Exception {
220            if (!isStarted()) {
221                throw new IllegalStateException("Not started");
222            }
223            intialize();
224            synchronized (indexLock) {
225                synchronized (this) {
226                    PListImpl result = this.persistentLists.get(name);
227                    if (result == null) {
228                        final PListImpl pl = new PListImpl(this);
229                        pl.setName(name);
230                        getPageFile().tx().execute(new Transaction.Closure<IOException>() {
231                            public void execute(Transaction tx) throws IOException {
232                                pl.setHeadPageId(tx.allocate().getPageId());
233                                pl.load(tx);
234                                metaData.lists.put(tx, name, pl);
235                            }
236                        });
237                        result = pl;
238                        this.persistentLists.put(name, pl);
239                    }
240                    final PListImpl toLoad = result;
241                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
242                        public void execute(Transaction tx) throws IOException {
243                            toLoad.load(tx);
244                        }
245                    });
246    
247                    return result;
248                }
249            }
250        }
251    
252        @Override
253        public boolean removePList(final String name) throws Exception {
254            boolean result = false;
255            synchronized (indexLock) {
256                synchronized (this) {
257                    final PList pl = this.persistentLists.remove(name);
258                    result = pl != null;
259                    if (result) {
260                        getPageFile().tx().execute(new Transaction.Closure<IOException>() {
261                            public void execute(Transaction tx) throws IOException {
262                                metaData.lists.remove(tx, name);
263                                pl.destroy();
264                            }
265                        });
266                    }
267                }
268            }
269            return result;
270        }
271    
272        protected synchronized void intialize() throws Exception {
273            if (isStarted()) {
274                if (this.initialized == false) {
275                    if (this.directory == null) {
276                        this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
277                    }
278                    IOHelper.mkdirs(this.directory);
279                    lock();
280                    this.journal = new Journal();
281                    this.journal.setDirectory(directory);
282                    this.journal.setMaxFileLength(getJournalMaxFileLength());
283                    this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
284                    this.journal.start();
285                    this.pageFile = new PageFile(directory, "tmpDB");
286                    this.pageFile.setEnablePageCaching(getIndexEnablePageCaching());
287                    this.pageFile.setPageSize(getIndexPageSize());
288                    this.pageFile.setWriteBatchSize(getIndexWriteBatchSize());
289                    this.pageFile.setPageCacheSize(getIndexCacheSize());
290                    this.pageFile.load();
291    
292                    this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
293                        public void execute(Transaction tx) throws IOException {
294                            if (pageFile.getPageCount() == 0) {
295                                Page<MetaData> page = tx.allocate();
296                                assert page.getPageId() == 0;
297                                page.set(metaData);
298                                metaData.page = page;
299                                metaData.createIndexes(tx);
300                                tx.store(metaData.page, metaDataMarshaller, true);
301    
302                            } else {
303                                Page<MetaData> page = tx.load(0, metaDataMarshaller);
304                                metaData = page.get();
305                                metaData.page = page;
306                            }
307                            metaData.load(tx);
308                            metaData.loadLists(tx, persistentLists);
309                        }
310                    });
311                    this.pageFile.flush();
312    
313                    if (cleanupInterval > 0) {
314                        if (scheduler == null) {
315                            scheduler = new Scheduler(PListStoreImpl.class.getSimpleName());
316                            scheduler.start();
317                        }
318                        scheduler.executePeriodically(this, cleanupInterval);
319                    }
320                    this.initialized = true;
321                    LOG.info(this + " initialized");
322                }
323            }
324        }
325    
326        @Override
327        protected synchronized void doStart() throws Exception {
328            if (!lazyInit) {
329                intialize();
330            }
331            LOG.info(this + " started");
332        }
333    
334        @Override
335        protected synchronized void doStop(ServiceStopper stopper) throws Exception {
336            if (scheduler != null) {
337                if (PListStoreImpl.class.getSimpleName().equals(scheduler.getName())) {
338                    scheduler.stop();
339                    scheduler = null;
340                }
341            }
342            for (PListImpl pl : this.persistentLists.values()) {
343                pl.unload(null);
344            }
345            if (this.pageFile != null) {
346                this.pageFile.unload();
347            }
348            if (this.journal != null) {
349                journal.close();
350            }
351            if (this.lockFile != null) {
352                this.lockFile.unlock();
353            }
354            this.lockFile = null;
355            this.initialized = false;
356            LOG.info(this + " stopped");
357    
358        }
359    
360        public void run() {
361            try {
362                if (isStopping()) {
363                    return;
364                }
365                final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId();
366                final Set<Integer> candidates = journal.getFileMap().keySet();
367                LOG.trace("Full gc candidate set:" + candidates);
368                if (candidates.size() > 1) {
369                    // prune current write
370                    for (Iterator<Integer> iterator = candidates.iterator(); iterator.hasNext();) {
371                        if (iterator.next() >= lastJournalFileId) {
372                            iterator.remove();
373                        }
374                    }
375                    List<PListImpl> plists = null;
376                    synchronized (indexLock) {
377                        synchronized (this) {
378                            plists = new ArrayList<PListImpl>(persistentLists.values());
379                        }
380                    }
381                    for (PListImpl list : plists) {
382                        list.claimFileLocations(candidates);
383                        if (isStopping()) {
384                            return;
385                        }
386                        LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates);
387                    }
388                    LOG.trace("GC Candidate set:" + candidates);
389                    this.journal.removeDataFiles(candidates);
390                }
391            } catch (IOException e) {
392                LOG.error("Exception on periodic cleanup: " + e, e);
393            }
394        }
395    
396        ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
397            ByteSequence result = null;
398            result = this.journal.read(location);
399            return result;
400        }
401    
402        Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
403            return this.journal.write(payload, sync);
404        }
405    
406        private void lock() throws IOException {
407            if (lockFile == null) {
408                File lockFileName = new File(directory, "lock");
409                lockFile = new LockFile(lockFileName, true);
410                if (failIfDatabaseIsLocked) {
411                    lockFile.lock();
412                } else {
413                    while (true) {
414                        try {
415                            lockFile.lock();
416                            break;
417                        } catch (IOException e) {
418                            LOG.info("Database " + lockFileName + " is locked... waiting "
419                                    + (DATABASE_LOCKED_WAIT_DELAY / 1000)
420                                    + " seconds for the database to be unlocked. Reason: " + e);
421                            try {
422                                Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
423                            } catch (InterruptedException e1) {
424                            }
425                        }
426                    }
427                }
428            }
429        }
430    
431        PageFile getPageFile() {
432            this.pageFile.isLoaded();
433            return this.pageFile;
434        }
435    
436        public boolean isFailIfDatabaseIsLocked() {
437            return failIfDatabaseIsLocked;
438        }
439    
440        public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
441            this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
442        }
443    
444        public int getJournalMaxFileLength() {
445            return journalMaxFileLength;
446        }
447    
448        public void setJournalMaxFileLength(int journalMaxFileLength) {
449            this.journalMaxFileLength = journalMaxFileLength;
450        }
451    
452        public int getJournalMaxWriteBatchSize() {
453            return journalMaxWriteBatchSize;
454        }
455    
456        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
457            this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
458        }
459    
460        public boolean isEnableIndexWriteAsync() {
461            return enableIndexWriteAsync;
462        }
463    
464        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
465            this.enableIndexWriteAsync = enableIndexWriteAsync;
466        }
467    
468        public long getCleanupInterval() {
469            return cleanupInterval;
470        }
471    
472        public void setCleanupInterval(long cleanupInterval) {
473            this.cleanupInterval = cleanupInterval;
474        }
475    
476        public boolean isLazyInit() {
477            return lazyInit;
478        }
479    
480        public void setLazyInit(boolean lazyInit) {
481            this.lazyInit = lazyInit;
482        }
483    
484        @Override
485        public String toString() {
486            String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
487            return "PListStore:[" + path + "]";
488        }
489    }