001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.journal;
018    
019    import java.io.File;
020    import java.io.IOException;
021    
022    import org.apache.activeio.journal.Journal;
023    import org.apache.activeio.journal.active.JournalImpl;
024    import org.apache.activeio.journal.active.JournalLockedException;
025    import org.apache.activemq.broker.Locker;
026    import org.apache.activemq.store.PersistenceAdapter;
027    import org.apache.activemq.store.PersistenceAdapterFactory;
028    import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
029    import org.apache.activemq.store.jdbc.JDBCAdapter;
030    import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
031    import org.apache.activemq.store.jdbc.Statements;
032    import org.apache.activemq.thread.TaskRunnerFactory;
033    import org.apache.activemq.util.ServiceStopper;
034    import org.slf4j.Logger;
035    import org.slf4j.LoggerFactory;
036    
037    /**
038     * Factory class that can create PersistenceAdapter objects.
039     * 
040     * @org.apache.xbean.XBean
041     * 
042     */
043    public class JournalPersistenceAdapterFactory extends DataSourceServiceSupport implements PersistenceAdapterFactory {
044    
045        private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
046    
047        private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapterFactory.class);
048    
049        private int journalLogFileSize = 1024 * 1024 * 20;
050        private int journalLogFiles = 2;
051        private TaskRunnerFactory taskRunnerFactory;
052        private Journal journal;
053        private boolean useJournal = true;
054        private boolean useQuickJournal;
055        private File journalArchiveDirectory;
056        private boolean failIfJournalIsLocked;
057        private int journalThreadPriority = Thread.MAX_PRIORITY;
058        private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
059        private boolean useDedicatedTaskRunner;
060    
061        public PersistenceAdapter createPersistenceAdapter() throws IOException {
062            jdbcPersistenceAdapter.setDataSource(getDataSource());
063    
064            if (!useJournal) {
065                return jdbcPersistenceAdapter;
066            }
067            JournalPersistenceAdapter result =  new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
068            result.setDirectory(getDataDirectoryFile());
069            return result;
070    
071        }
072    
073        public int getJournalLogFiles() {
074            return journalLogFiles;
075        }
076    
077        /**
078         * Sets the number of journal log files to use
079         */
080        public void setJournalLogFiles(int journalLogFiles) {
081            this.journalLogFiles = journalLogFiles;
082        }
083    
084        public int getJournalLogFileSize() {
085            return journalLogFileSize;
086        }
087    
088        /**
089         * Sets the size of the journal log files
090         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
091         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
092         */
093        public void setJournalLogFileSize(int journalLogFileSize) {
094            this.journalLogFileSize = journalLogFileSize;
095        }
096    
097        public JDBCPersistenceAdapter getJdbcAdapter() {
098            return jdbcPersistenceAdapter;
099        }
100    
101        public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) {
102            this.jdbcPersistenceAdapter = jdbcAdapter;
103        }
104    
105        public boolean isUseJournal() {
106            return useJournal;
107        }
108    
109        /**
110         * Enables or disables the use of the journal. The default is to use the
111         * journal
112         * 
113         * @param useJournal
114         */
115        public void setUseJournal(boolean useJournal) {
116            this.useJournal = useJournal;
117        }
118    
119        public boolean isUseDedicatedTaskRunner() {
120            return useDedicatedTaskRunner;
121        }
122        
123        public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
124            this.useDedicatedTaskRunner = useDedicatedTaskRunner;
125        }
126        
127        public TaskRunnerFactory getTaskRunnerFactory() {
128            if (taskRunnerFactory == null) {
129                taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority,
130                                                          true, 1000, isUseDedicatedTaskRunner());
131            }
132            return taskRunnerFactory;
133        }
134    
135        public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
136            this.taskRunnerFactory = taskRunnerFactory;
137        }
138    
139        public Journal getJournal() throws IOException {
140            if (journal == null) {
141                createJournal();
142            }
143            return journal;
144        }
145    
146        public void setJournal(Journal journal) {
147            this.journal = journal;
148        }
149    
150        public File getJournalArchiveDirectory() {
151            if (journalArchiveDirectory == null && useQuickJournal) {
152                journalArchiveDirectory = new File(getDataDirectoryFile(), "journal");
153            }
154            return journalArchiveDirectory;
155        }
156    
157        public void setJournalArchiveDirectory(File journalArchiveDirectory) {
158            this.journalArchiveDirectory = journalArchiveDirectory;
159        }
160    
161        public boolean isUseQuickJournal() {
162            return useQuickJournal;
163        }
164    
165        /**
166         * Enables or disables the use of quick journal, which keeps messages in the
167         * journal and just stores a reference to the messages in JDBC. Defaults to
168         * false so that messages actually reside long term in the JDBC database.
169         */
170        public void setUseQuickJournal(boolean useQuickJournal) {
171            this.useQuickJournal = useQuickJournal;
172        }
173    
174        public JDBCAdapter getAdapter() throws IOException {
175            return jdbcPersistenceAdapter.getAdapter();
176        }
177    
178        public void setAdapter(JDBCAdapter adapter) {
179            jdbcPersistenceAdapter.setAdapter(adapter);
180        }
181    
182        public Statements getStatements() {
183            return jdbcPersistenceAdapter.getStatements();
184        }
185    
186        public void setStatements(Statements statements) {
187            jdbcPersistenceAdapter.setStatements(statements);
188        }
189    
190        /**
191         * Sets whether or not an exclusive database lock should be used to enable
192         * JDBC Master/Slave. Enabled by default.
193         */
194        public void setUseDatabaseLock(boolean useDatabaseLock) {
195            jdbcPersistenceAdapter.setUseLock(useDatabaseLock);
196        }
197    
198        public boolean isCreateTablesOnStartup() {
199            return jdbcPersistenceAdapter.isCreateTablesOnStartup();
200        }
201    
202        /**
203         * Sets whether or not tables are created on startup
204         */
205        public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
206            jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
207        }
208    
209        public int getJournalThreadPriority() {
210            return journalThreadPriority;
211        }
212    
213        /**
214         * Sets the thread priority of the journal thread
215         */
216        public void setJournalThreadPriority(int journalThreadPriority) {
217            this.journalThreadPriority = journalThreadPriority;
218        }
219    
220        /**
221         * @throws IOException
222         */
223        protected void createJournal() throws IOException {
224            File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile();
225            if (failIfJournalIsLocked) {
226                journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
227                                          getJournalArchiveDirectory());
228            } else {
229                while (true) {
230                    try {
231                        journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
232                                                  getJournalArchiveDirectory());
233                        break;
234                    } catch (JournalLockedException e) {
235                        LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000)
236                                 + " seconds for the journal to be unlocked.");
237                        try {
238                            Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
239                        } catch (InterruptedException e1) {
240                        }
241                    }
242                }
243            }
244        }
245    
246        @Override
247        public Locker createDefaultLocker() throws IOException {
248            return null;
249        }
250    
251        @Override
252        public void init() throws Exception {
253        }
254    
255        @Override
256        protected void doStop(ServiceStopper stopper) throws Exception {}
257    
258        @Override
259        protected void doStart() throws Exception {}
260    }