001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.scheduler;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.File;
022    import java.io.IOException;
023    import java.util.ArrayList;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Map.Entry;
030    import java.util.Set;
031    import java.util.concurrent.atomic.AtomicLong;
032    
033    import org.apache.activemq.broker.scheduler.JobScheduler;
034    import org.apache.activemq.broker.scheduler.JobSchedulerStore;
035    import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
036    import org.apache.activemq.store.kahadb.disk.journal.Journal;
037    import org.apache.activemq.store.kahadb.disk.journal.Location;
038    import org.apache.activemq.store.kahadb.disk.page.Page;
039    import org.apache.activemq.store.kahadb.disk.page.PageFile;
040    import org.apache.activemq.store.kahadb.disk.page.Transaction;
041    import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
042    import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
043    import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
044    import org.apache.activemq.util.ByteSequence;
045    import org.apache.activemq.util.IOHelper;
046    import org.apache.activemq.util.LockFile;
047    import org.apache.activemq.util.ServiceStopper;
048    import org.apache.activemq.util.ServiceSupport;
049    import org.slf4j.Logger;
050    import org.slf4j.LoggerFactory;
051    
052    public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore {
053        static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
054        private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
055    
056        public static final int CLOSED_STATE = 1;
057        public static final int OPEN_STATE = 2;
058    
059        private File directory;
060        PageFile pageFile;
061        private Journal journal;
062        protected AtomicLong journalSize = new AtomicLong(0);
063        private LockFile lockFile;
064        private boolean failIfDatabaseIsLocked;
065        private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
066        private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
067        private boolean enableIndexWriteAsync = false;
068        // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
069        MetaData metaData = new MetaData(this);
070        final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
071        Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
072    
073        protected class MetaData {
074            protected MetaData(JobSchedulerStoreImpl store) {
075                this.store = store;
076            }
077    
078            private final JobSchedulerStoreImpl store;
079            Page<MetaData> page;
080            BTreeIndex<Integer, Integer> journalRC;
081            BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
082    
083            void createIndexes(Transaction tx) throws IOException {
084                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
085                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
086            }
087    
088            void load(Transaction tx) throws IOException {
089                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
090                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
091                this.storedSchedulers.load(tx);
092                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
093                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
094                this.journalRC.load(tx);
095            }
096    
097            void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
098                for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
099                    Entry<String, JobSchedulerImpl> entry = i.next();
100                    entry.getValue().load(tx);
101                    schedulers.put(entry.getKey(), entry.getValue());
102                }
103            }
104    
105            public void read(DataInput is) throws IOException {
106                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
107                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
108                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
109                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
110                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
111                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
112            }
113    
114            public void write(DataOutput os) throws IOException {
115                os.writeLong(this.storedSchedulers.getPageId());
116                os.writeLong(this.journalRC.getPageId());
117            }
118        }
119    
120        class MetaDataMarshaller extends VariableMarshaller<MetaData> {
121            private final JobSchedulerStoreImpl store;
122    
123            MetaDataMarshaller(JobSchedulerStoreImpl store) {
124                this.store = store;
125            }
126    
127            @Override
128            public MetaData readPayload(DataInput dataIn) throws IOException {
129                MetaData rc = new MetaData(this.store);
130                rc.read(dataIn);
131                return rc;
132            }
133    
134            @Override
135            public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
136                object.write(dataOut);
137            }
138        }
139    
140        class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
141            @Override
142            public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
143                List<JobLocation> result = new ArrayList<JobLocation>();
144                int size = dataIn.readInt();
145                for (int i = 0; i < size; i++) {
146                    JobLocation jobLocation = new JobLocation();
147                    jobLocation.readExternal(dataIn);
148                    result.add(jobLocation);
149                }
150                return result;
151            }
152    
153            @Override
154            public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
155                dataOut.writeInt(value.size());
156                for (JobLocation jobLocation : value) {
157                    jobLocation.writeExternal(dataOut);
158                }
159            }
160        }
161    
162        class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
163            private final JobSchedulerStoreImpl store;
164    
165            JobSchedulerMarshaller(JobSchedulerStoreImpl store) {
166                this.store = store;
167            }
168    
169            @Override
170            public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
171                JobSchedulerImpl result = new JobSchedulerImpl(this.store);
172                result.read(dataIn);
173                return result;
174            }
175    
176            @Override
177            public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
178                js.write(dataOut);
179            }
180        }
181    
182        @Override
183        public File getDirectory() {
184            return directory;
185        }
186    
187        @Override
188        public void setDirectory(File directory) {
189            this.directory = directory;
190        }
191    
192        @Override
193        public long size() {
194            if (!isStarted()) {
195                return 0;
196            }
197            try {
198                return journalSize.get() + pageFile.getDiskSize();
199            } catch (IOException e) {
200                throw new RuntimeException(e);
201            }
202        }
203    
204        @Override
205        public JobScheduler getJobScheduler(final String name) throws Exception {
206            JobSchedulerImpl result = this.schedulers.get(name);
207            if (result == null) {
208                final JobSchedulerImpl js = new JobSchedulerImpl(this);
209                js.setName(name);
210                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
211                    @Override
212                    public void execute(Transaction tx) throws IOException {
213                        js.createIndexes(tx);
214                        js.load(tx);
215                        metaData.storedSchedulers.put(tx, name, js);
216                    }
217                });
218                result = js;
219                this.schedulers.put(name, js);
220                if (isStarted()) {
221                    result.start();
222                }
223                this.pageFile.flush();
224            }
225            return result;
226        }
227    
228        @Override
229        synchronized public boolean removeJobScheduler(final String name) throws Exception {
230            boolean result = false;
231            final JobSchedulerImpl js = this.schedulers.remove(name);
232            result = js != null;
233            if (result) {
234                js.stop();
235                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
236                    @Override
237                    public void execute(Transaction tx) throws IOException {
238                        metaData.storedSchedulers.remove(tx, name);
239                        js.destroy(tx);
240                    }
241                });
242            }
243            return result;
244        }
245    
246        @Override
247        protected synchronized void doStart() throws Exception {
248            if (this.directory == null) {
249                this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
250            }
251            IOHelper.mkdirs(this.directory);
252            lock();
253            this.journal = new Journal();
254            this.journal.setDirectory(directory);
255            this.journal.setMaxFileLength(getJournalMaxFileLength());
256            this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
257            this.journal.setSizeAccumulator(this.journalSize);
258            this.journal.start();
259            this.pageFile = new PageFile(directory, "scheduleDB");
260            this.pageFile.setWriteBatchSize(1);
261            this.pageFile.load();
262    
263            this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
264                @Override
265                public void execute(Transaction tx) throws IOException {
266                    if (pageFile.getPageCount() == 0) {
267                        Page<MetaData> page = tx.allocate();
268                        assert page.getPageId() == 0;
269                        page.set(metaData);
270                        metaData.page = page;
271                        metaData.createIndexes(tx);
272                        tx.store(metaData.page, metaDataMarshaller, true);
273    
274                    } else {
275                        Page<MetaData> page = tx.load(0, metaDataMarshaller);
276                        metaData = page.get();
277                        metaData.page = page;
278                    }
279                    metaData.load(tx);
280                    metaData.loadScheduler(tx, schedulers);
281                    for (JobSchedulerImpl js : schedulers.values()) {
282                        try {
283                            js.start();
284                        } catch (Exception e) {
285                            JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e);
286                        }
287                    }
288                }
289            });
290    
291            this.pageFile.flush();
292            LOG.info(this + " started");
293        }
294    
295        @Override
296        protected synchronized void doStop(ServiceStopper stopper) throws Exception {
297            for (JobSchedulerImpl js : this.schedulers.values()) {
298                js.stop();
299            }
300            if (this.pageFile != null) {
301                this.pageFile.unload();
302            }
303            if (this.journal != null) {
304                journal.close();
305            }
306            if (this.lockFile != null) {
307                this.lockFile.unlock();
308            }
309            this.lockFile = null;
310            LOG.info(this + " stopped");
311        }
312    
313        synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
314            int logId = location.getDataFileId();
315            Integer val = this.metaData.journalRC.get(tx, logId);
316            int refCount = val != null ? val.intValue() + 1 : 1;
317            this.metaData.journalRC.put(tx, logId, refCount);
318        }
319    
320        synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
321            int logId = location.getDataFileId();
322            int refCount = this.metaData.journalRC.get(tx, logId);
323            refCount--;
324            if (refCount <= 0) {
325                this.metaData.journalRC.remove(tx, logId);
326                Set<Integer> set = new HashSet<Integer>();
327                set.add(logId);
328                this.journal.removeDataFiles(set);
329            } else {
330                this.metaData.journalRC.put(tx, logId, refCount);
331            }
332        }
333    
334        synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
335            ByteSequence result = null;
336            result = this.journal.read(location);
337            return result;
338        }
339    
340        synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
341            return this.journal.write(payload, sync);
342        }
343    
344        private void lock() throws IOException {
345            if (lockFile == null) {
346                File lockFileName = new File(directory, "lock");
347                lockFile = new LockFile(lockFileName, true);
348                if (failIfDatabaseIsLocked) {
349                    lockFile.lock();
350                } else {
351                    while (true) {
352                        try {
353                            lockFile.lock();
354                            break;
355                        } catch (IOException e) {
356                            LOG.info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000)
357                                + " seconds for the database to be unlocked. Reason: " + e);
358                            try {
359                                Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
360                            } catch (InterruptedException e1) {
361                            }
362                        }
363                    }
364                }
365            }
366        }
367    
368        PageFile getPageFile() {
369            this.pageFile.isLoaded();
370            return this.pageFile;
371        }
372    
373        public boolean isFailIfDatabaseIsLocked() {
374            return failIfDatabaseIsLocked;
375        }
376    
377        public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
378            this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
379        }
380    
381        public int getJournalMaxFileLength() {
382            return journalMaxFileLength;
383        }
384    
385        public void setJournalMaxFileLength(int journalMaxFileLength) {
386            this.journalMaxFileLength = journalMaxFileLength;
387        }
388    
389        public int getJournalMaxWriteBatchSize() {
390            return journalMaxWriteBatchSize;
391        }
392    
393        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
394            this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
395        }
396    
397        public boolean isEnableIndexWriteAsync() {
398            return enableIndexWriteAsync;
399        }
400    
401        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
402            this.enableIndexWriteAsync = enableIndexWriteAsync;
403        }
404    
405        @Override
406        public String toString() {
407            return "JobSchedulerStore:" + this.directory;
408        }
409    }