001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb.scheduler;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.File;
022    import java.io.IOException;
023    import java.util.ArrayList;
024    import java.util.HashMap;
025    import java.util.HashSet;
026    import java.util.Iterator;
027    import java.util.List;
028    import java.util.Map;
029    import java.util.Map.Entry;
030    import java.util.Set;
031    import java.util.concurrent.atomic.AtomicLong;
032    
033    import org.apache.activemq.broker.scheduler.JobScheduler;
034    import org.apache.activemq.broker.scheduler.JobSchedulerStore;
035    import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
036    import org.apache.activemq.store.kahadb.disk.journal.Journal;
037    import org.apache.activemq.store.kahadb.disk.journal.Location;
038    import org.apache.activemq.store.kahadb.disk.page.Page;
039    import org.apache.activemq.store.kahadb.disk.page.PageFile;
040    import org.apache.activemq.store.kahadb.disk.page.Transaction;
041    import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
042    import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
043    import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
044    import org.apache.activemq.util.ByteSequence;
045    import org.apache.activemq.util.IOHelper;
046    import org.apache.activemq.util.LockFile;
047    import org.apache.activemq.util.ServiceStopper;
048    import org.apache.activemq.util.ServiceSupport;
049    import org.slf4j.Logger;
050    import org.slf4j.LoggerFactory;
051    
052    public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore {
053        static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
054        private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
055    
056        public static final int CLOSED_STATE = 1;
057        public static final int OPEN_STATE = 2;
058    
059        private File directory;
060        PageFile pageFile;
061        private Journal journal;
062        protected AtomicLong journalSize = new AtomicLong(0);
063        private LockFile lockFile;
064        private boolean failIfDatabaseIsLocked;
065        private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
066        private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
067        private boolean enableIndexWriteAsync = false;
068        MetaData metaData = new MetaData(this);
069        final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
070        Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
071    
072        protected class MetaData {
073            protected MetaData(JobSchedulerStoreImpl store) {
074                this.store = store;
075            }
076    
077            private final JobSchedulerStoreImpl store;
078            Page<MetaData> page;
079            BTreeIndex<Integer, Integer> journalRC;
080            BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
081    
082            void createIndexes(Transaction tx) throws IOException {
083                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
084                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
085            }
086    
087            void load(Transaction tx) throws IOException {
088                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
089                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
090                this.storedSchedulers.load(tx);
091                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
092                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
093                this.journalRC.load(tx);
094            }
095    
096            void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
097                for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
098                    Entry<String, JobSchedulerImpl> entry = i.next();
099                    entry.getValue().load(tx);
100                    schedulers.put(entry.getKey(), entry.getValue());
101                }
102            }
103    
104            public void read(DataInput is) throws IOException {
105                this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
106                this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
107                this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
108                this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
109                this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
110                this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
111            }
112    
113            public void write(DataOutput os) throws IOException {
114                os.writeLong(this.storedSchedulers.getPageId());
115                os.writeLong(this.journalRC.getPageId());
116            }
117        }
118    
119        class MetaDataMarshaller extends VariableMarshaller<MetaData> {
120            private final JobSchedulerStoreImpl store;
121    
122            MetaDataMarshaller(JobSchedulerStoreImpl store) {
123                this.store = store;
124            }
125    
126            @Override
127            public MetaData readPayload(DataInput dataIn) throws IOException {
128                MetaData rc = new MetaData(this.store);
129                rc.read(dataIn);
130                return rc;
131            }
132    
133            @Override
134            public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
135                object.write(dataOut);
136            }
137        }
138    
139        class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
140            @Override
141            public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
142                List<JobLocation> result = new ArrayList<JobLocation>();
143                int size = dataIn.readInt();
144                for (int i = 0; i < size; i++) {
145                    JobLocation jobLocation = new JobLocation();
146                    jobLocation.readExternal(dataIn);
147                    result.add(jobLocation);
148                }
149                return result;
150            }
151    
152            @Override
153            public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
154                dataOut.writeInt(value.size());
155                for (JobLocation jobLocation : value) {
156                    jobLocation.writeExternal(dataOut);
157                }
158            }
159        }
160    
161        class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
162            private final JobSchedulerStoreImpl store;
163    
164            JobSchedulerMarshaller(JobSchedulerStoreImpl store) {
165                this.store = store;
166            }
167    
168            @Override
169            public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
170                JobSchedulerImpl result = new JobSchedulerImpl(this.store);
171                result.read(dataIn);
172                return result;
173            }
174    
175            @Override
176            public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
177                js.write(dataOut);
178            }
179        }
180    
181        @Override
182        public File getDirectory() {
183            return directory;
184        }
185    
186        @Override
187        public void setDirectory(File directory) {
188            this.directory = directory;
189        }
190    
191        @Override
192        public long size() {
193            if (!isStarted()) {
194                return 0;
195            }
196            try {
197                return journalSize.get() + pageFile.getDiskSize();
198            } catch (IOException e) {
199                throw new RuntimeException(e);
200            }
201        }
202    
203        @Override
204        public JobScheduler getJobScheduler(final String name) throws Exception {
205            JobSchedulerImpl result = this.schedulers.get(name);
206            if (result == null) {
207                final JobSchedulerImpl js = new JobSchedulerImpl(this);
208                js.setName(name);
209                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
210                    @Override
211                    public void execute(Transaction tx) throws IOException {
212                        js.createIndexes(tx);
213                        js.load(tx);
214                        metaData.storedSchedulers.put(tx, name, js);
215                    }
216                });
217                result = js;
218                this.schedulers.put(name, js);
219                if (isStarted()) {
220                    result.start();
221                }
222                this.pageFile.flush();
223            }
224            return result;
225        }
226    
227        @Override
228        synchronized public boolean removeJobScheduler(final String name) throws Exception {
229            boolean result = false;
230            final JobSchedulerImpl js = this.schedulers.remove(name);
231            result = js != null;
232            if (result) {
233                js.stop();
234                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
235                    @Override
236                    public void execute(Transaction tx) throws IOException {
237                        metaData.storedSchedulers.remove(tx, name);
238                        js.destroy(tx);
239                    }
240                });
241            }
242            return result;
243        }
244    
245        @Override
246        protected synchronized void doStart() throws Exception {
247            if (this.directory == null) {
248                this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
249            }
250            IOHelper.mkdirs(this.directory);
251            lock();
252            this.journal = new Journal();
253            this.journal.setDirectory(directory);
254            this.journal.setMaxFileLength(getJournalMaxFileLength());
255            this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
256            this.journal.setSizeAccumulator(this.journalSize);
257            this.journal.start();
258            this.pageFile = new PageFile(directory, "scheduleDB");
259            this.pageFile.setWriteBatchSize(1);
260            this.pageFile.load();
261    
262            this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
263                @Override
264                public void execute(Transaction tx) throws IOException {
265                    if (pageFile.getPageCount() == 0) {
266                        Page<MetaData> page = tx.allocate();
267                        assert page.getPageId() == 0;
268                        page.set(metaData);
269                        metaData.page = page;
270                        metaData.createIndexes(tx);
271                        tx.store(metaData.page, metaDataMarshaller, true);
272    
273                    } else {
274                        Page<MetaData> page = tx.load(0, metaDataMarshaller);
275                        metaData = page.get();
276                        metaData.page = page;
277                    }
278                    metaData.load(tx);
279                    metaData.loadScheduler(tx, schedulers);
280                    for (JobSchedulerImpl js : schedulers.values()) {
281                        try {
282                            js.start();
283                        } catch (Exception e) {
284                            JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e);
285                        }
286                    }
287                }
288            });
289    
290            this.pageFile.flush();
291            LOG.info(this + " started");
292        }
293    
294        @Override
295        protected synchronized void doStop(ServiceStopper stopper) throws Exception {
296            for (JobSchedulerImpl js : this.schedulers.values()) {
297                js.stop();
298            }
299            if (this.pageFile != null) {
300                this.pageFile.unload();
301            }
302            if (this.journal != null) {
303                journal.close();
304            }
305            if (this.lockFile != null) {
306                this.lockFile.unlock();
307            }
308            this.lockFile = null;
309            LOG.info(this + " stopped");
310        }
311    
312        synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
313            int logId = location.getDataFileId();
314            Integer val = this.metaData.journalRC.get(tx, logId);
315            int refCount = val != null ? val.intValue() + 1 : 1;
316            this.metaData.journalRC.put(tx, logId, refCount);
317        }
318    
319        synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
320            int logId = location.getDataFileId();
321            int refCount = this.metaData.journalRC.get(tx, logId);
322            refCount--;
323            if (refCount <= 0) {
324                this.metaData.journalRC.remove(tx, logId);
325                Set<Integer> set = new HashSet<Integer>();
326                set.add(logId);
327                this.journal.removeDataFiles(set);
328            } else {
329                this.metaData.journalRC.put(tx, logId, refCount);
330            }
331        }
332    
333        synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
334            ByteSequence result = null;
335            result = this.journal.read(location);
336            return result;
337        }
338    
339        synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
340            return this.journal.write(payload, sync);
341        }
342    
343        private void lock() throws IOException {
344            if (lockFile == null) {
345                File lockFileName = new File(directory, "lock");
346                lockFile = new LockFile(lockFileName, true);
347                if (failIfDatabaseIsLocked) {
348                    lockFile.lock();
349                } else {
350                    while (true) {
351                        try {
352                            lockFile.lock();
353                            break;
354                        } catch (IOException e) {
355                            LOG.info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000)
356                                + " seconds for the database to be unlocked. Reason: " + e);
357                            try {
358                                Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
359                            } catch (InterruptedException e1) {
360                            }
361                        }
362                    }
363                }
364            }
365        }
366    
367        PageFile getPageFile() {
368            this.pageFile.isLoaded();
369            return this.pageFile;
370        }
371    
372        public boolean isFailIfDatabaseIsLocked() {
373            return failIfDatabaseIsLocked;
374        }
375    
376        public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
377            this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
378        }
379    
380        public int getJournalMaxFileLength() {
381            return journalMaxFileLength;
382        }
383    
384        public void setJournalMaxFileLength(int journalMaxFileLength) {
385            this.journalMaxFileLength = journalMaxFileLength;
386        }
387    
388        public int getJournalMaxWriteBatchSize() {
389            return journalMaxWriteBatchSize;
390        }
391    
392        public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
393            this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
394        }
395    
396        public boolean isEnableIndexWriteAsync() {
397            return enableIndexWriteAsync;
398        }
399    
400        public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
401            this.enableIndexWriteAsync = enableIndexWriteAsync;
402        }
403    
404        @Override
405        public String toString() {
406            return "JobSchedulerStore:" + this.directory;
407        }
408    }