001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.broker.scheduler;
018    
019    import org.apache.activemq.util.IOHelper;
020    import org.apache.activemq.util.ServiceStopper;
021    import org.apache.activemq.util.ServiceSupport;
022    import org.apache.kahadb.index.BTreeIndex;
023    import org.apache.kahadb.journal.Journal;
024    import org.apache.kahadb.journal.Location;
025    import org.apache.kahadb.page.Page;
026    import org.apache.kahadb.page.PageFile;
027    import org.apache.kahadb.page.Transaction;
028    import org.apache.kahadb.util.ByteSequence;
029    import org.apache.kahadb.util.IntegerMarshaller;
030    import org.apache.kahadb.util.LockFile;
031    import org.apache.kahadb.util.StringMarshaller;
032    import org.apache.kahadb.util.VariableMarshaller;
033    import org.slf4j.Logger;
034    import org.slf4j.LoggerFactory;
035    
036    import java.io.DataInput;
037    import java.io.DataOutput;
038    import java.io.File;
039    import java.io.IOException;
040    import java.util.ArrayList;
041    import java.util.HashMap;
042    import java.util.HashSet;
043    import java.util.Iterator;
044    import java.util.List;
045    import java.util.Map;
046    import java.util.Map.Entry;
047    import java.util.Set;
048    
049    public class JobSchedulerStore extends ServiceSupport {
050        static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
051        private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052    
053        public static final int CLOSED_STATE = 1;
054        public static final int OPEN_STATE = 2;
055    
056        private File directory;
057        PageFile pageFile;
058        private Journal journal;
059        private LockFile lockFile;
060        private boolean failIfDatabaseIsLocked;
061        private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
062        private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
063        private boolean enableIndexWriteAsync = false;
064        // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
065        MetaData metaData = new MetaData(this);
066        final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
067        Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
068    
069        protected class MetaData {
070            protected MetaData(JobSchedulerStore store) {
071                this.store = store;
072            }
073            private final JobSchedulerStore store;
074            Page<MetaData> page;
075            BTreeIndex<Integer, Integer> journalRC;
076            BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
077    
078            void createIndexes(Transaction tx) throws IOException {
079                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
080                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
081            }
082    
083            void load(Transaction tx) throws IOException {
084                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
085                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
086                this.storedSchedulers.load(tx);
087                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
088                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
089                this.journalRC.load(tx);
090            }
091    
092            void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
093                for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
094                    Entry<String, JobSchedulerImpl> entry = i.next();
095                    entry.getValue().load(tx);
096                    schedulers.put(entry.getKey(), entry.getValue());
097                }
098            }
099    
100            public void read(DataInput is) throws IOException {
101                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
102                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
103                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
104                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
105                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
106                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
107            }
108    
109            public void write(DataOutput os) throws IOException {
110                os.writeLong(this.storedSchedulers.getPageId());
111                os.writeLong(this.journalRC.getPageId());
112    
113            }
114        }
115    
116        class MetaDataMarshaller extends VariableMarshaller<MetaData> {
117            private final JobSchedulerStore store;
118    
119            MetaDataMarshaller(JobSchedulerStore store) {
120                this.store = store;
121            }
122            public MetaData readPayload(DataInput dataIn) throws IOException {
123                MetaData rc = new MetaData(this.store);
124                rc.read(dataIn);
125                return rc;
126            }
127    
128            public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
129                object.write(dataOut);
130            }
131        }
132    
133        class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
134            public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
135                List<JobLocation> result = new ArrayList<JobLocation>();
136                int size = dataIn.readInt();
137                for (int i = 0; i < size; i++) {
138                    JobLocation jobLocation = new JobLocation();
139                    jobLocation.readExternal(dataIn);
140                    result.add(jobLocation);
141                }
142                return result;
143            }
144    
145            public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
146                dataOut.writeInt(value.size());
147                for (JobLocation jobLocation : value) {
148                    jobLocation.writeExternal(dataOut);
149                }
150            }
151        }
152    
153        class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
154            private final JobSchedulerStore store;
155            JobSchedulerMarshaller(JobSchedulerStore store) {
156                this.store = store;
157            }
158            public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
159                JobSchedulerImpl result = new JobSchedulerImpl(this.store);
160                result.read(dataIn);
161                return result;
162            }
163    
164            public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
165                js.write(dataOut);
166            }
167        }
168    
169        public File getDirectory() {
170            return directory;
171        }
172    
173        public void setDirectory(File directory) {
174            this.directory = directory;
175        }
176        
177        public long size() {
178            if ( !isStarted() ) {
179                return 0;
180            }
181            try {
182                return journal.getDiskSize() + pageFile.getDiskSize();
183            } catch (IOException e) {
184                throw new RuntimeException(e);
185            }
186        }
187    
188        public JobScheduler getJobScheduler(final String name) throws Exception {
189            JobSchedulerImpl result = this.schedulers.get(name);
190            if (result == null) {
191                final JobSchedulerImpl js = new JobSchedulerImpl(this);
192                js.setName(name);
193                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
194                    public void execute(Transaction tx) throws IOException {
195                        js.createIndexes(tx);
196                        js.load(tx);
197                        metaData.storedSchedulers.put(tx, name, js);
198                    }
199                });
200                result = js;
201                this.schedulers.put(name, js);
202                if (isStarted()) {
203                    result.start();
204                }
205                this.pageFile.flush();
206            }
207            return result;
208        }
209    
210        synchronized public boolean removeJobScheduler(final String name) throws Exception {
211            boolean result = false;
212            final JobSchedulerImpl js = this.schedulers.remove(name);
213            result = js != null;
214            if (result) {
215                js.stop();
216                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
217                    public void execute(Transaction tx) throws IOException {
218                        metaData.storedSchedulers.remove(tx, name);
219                        js.destroy(tx);
220                    }
221                });
222            }
223            return result;
224        }
225    
226        @Override
227        protected synchronized void doStart() throws Exception {
228            if (this.directory == null) {
229                this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
230            }
231            IOHelper.mkdirs(this.directory);
232            lock();
233            this.journal = new Journal();
234            this.journal.setDirectory(directory);
235            this.journal.setMaxFileLength(getJournalMaxFileLength());
236            this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
237            this.journal.start();
238            this.pageFile = new PageFile(directory, "scheduleDB");
239            this.pageFile.setWriteBatchSize(1);
240            this.pageFile.load();
241    
242            this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
243                public void execute(Transaction tx) throws IOException {
244                    if (pageFile.getPageCount() == 0) {
245                        Page<MetaData> page = tx.allocate();
246                        assert page.getPageId() == 0;
247                        page.set(metaData);
248                        metaData.page = page;
249                        metaData.createIndexes(tx);
250                        tx.store(metaData.page, metaDataMarshaller, true);
251    
252                    } else {
253                        Page<MetaData> page = tx.load(0, metaDataMarshaller);
254                        metaData = page.get();
255                        metaData.page = page;
256                    }
257                    metaData.load(tx);
258                    metaData.loadScheduler(tx, schedulers);
259                    for (JobSchedulerImpl js :schedulers.values()) {
260                        try {
261                            js.start();
262                        } catch (Exception e) {
263                            JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e);
264                        }
265                   }
266                }
267            });
268    
269            this.pageFile.flush();
270            LOG.info(this + " started");
271        }
272        
273        @Override
274        protected synchronized void doStop(ServiceStopper stopper) throws Exception {
275            for (JobSchedulerImpl js : this.schedulers.values()) {
276                js.stop();
277            }
278            if (this.pageFile != null) {
279                this.pageFile.unload();
280            }
281            if (this.journal != null) {
282                journal.close();
283            }
284            if (this.lockFile != null) {
285                this.lockFile.unlock();
286            }
287            this.lockFile = null;
288            LOG.info(this + " stopped");
289    
290        }
291    
292        synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
293            int logId = location.getDataFileId();
294            Integer val = this.metaData.journalRC.get(tx, logId);
295            int refCount = val != null ? val.intValue() + 1 : 1;
296            this.metaData.journalRC.put(tx, logId, refCount);
297    
298        }
299    
300        synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
301            int logId = location.getDataFileId();
302            int refCount = this.metaData.journalRC.get(tx, logId);
303            refCount--;
304            if (refCount <= 0) {
305                this.metaData.journalRC.remove(tx, logId);
306                Set<Integer> set = new HashSet<Integer>();
307                set.add(logId);
308                this.journal.removeDataFiles(set);
309            } else {
310                this.metaData.journalRC.put(tx, logId, refCount);
311            }
312    
313        }
314    
315        synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
316            ByteSequence result = null;
317            result = this.journal.read(location);
318            return result;
319        }
320    
321        synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
322            return this.journal.write(payload, sync);
323        }
324    
325        private void lock() throws IOException {
326            if (lockFile == null) {
327                File lockFileName = new File(directory, "lock");
328                lockFile = new LockFile(lockFileName, true);
329                if (failIfDatabaseIsLocked) {
330                    lockFile.lock();
331                } else {
332                    while (true) {
333                        try {
334                            lockFile.lock();
335                            break;
336                        } catch (IOException e) {
337                            LOG.info("Database " + lockFileName + " is locked... waiting "
338                                    + (DATABASE_LOCKED_WAIT_DELAY / 1000)
339                                    + " seconds for the database to be unlocked. Reason: " + e);
340                            try {
341                                Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
342                            } catch (InterruptedException e1) {
343                            }
344                        }
345                    }
346                }
347            }
348        }
349    
350        PageFile getPageFile() {
351            this.pageFile.isLoaded();
352            return this.pageFile;
353        }
354    
355        public boolean isFailIfDatabaseIsLocked() {
356            return failIfDatabaseIsLocked;
357        }
358    
359        public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
360            this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
361        }
362    
363        public int getJournalMaxFileLength() {
364            return journalMaxFileLength;
365        }
366    
367        public void setJournalMaxFileLength(int journalMaxFileLength) {
368            this.journalMaxFileLength = journalMaxFileLength;
369        }
370    
371        public int getJournalMaxWriteBatchSize() {
372            return journalMaxWriteBatchSize;
373        }
374    
375        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
376            this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
377        }
378    
379        public boolean isEnableIndexWriteAsync() {
380            return enableIndexWriteAsync;
381        }
382    
383        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
384            this.enableIndexWriteAsync = enableIndexWriteAsync;
385        }
386    
387        @Override
388        public String toString() {
389            return "JobSchedulerStore:" + this.directory;
390        }
391    
392    }