001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.journal;
018
019 import java.io.File;
020 import java.io.IOException;
021
022 import org.apache.activeio.journal.Journal;
023 import org.apache.activeio.journal.active.JournalImpl;
024 import org.apache.activeio.journal.active.JournalLockedException;
025 import org.apache.activemq.broker.Locker;
026 import org.apache.activemq.store.PersistenceAdapter;
027 import org.apache.activemq.store.PersistenceAdapterFactory;
028 import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
029 import org.apache.activemq.store.jdbc.JDBCAdapter;
030 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
031 import org.apache.activemq.store.jdbc.Statements;
032 import org.apache.activemq.thread.TaskRunnerFactory;
033 import org.apache.activemq.util.ServiceStopper;
034 import org.slf4j.Logger;
035 import org.slf4j.LoggerFactory;
036
037 /**
038 * Factory class that can create PersistenceAdapter objects.
039 *
040 * @org.apache.xbean.XBean
041 *
042 */
043 public class JournalPersistenceAdapterFactory extends DataSourceServiceSupport implements PersistenceAdapterFactory {
044
045 private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
046
047 private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapterFactory.class);
048
049 private int journalLogFileSize = 1024 * 1024 * 20;
050 private int journalLogFiles = 2;
051 private TaskRunnerFactory taskRunnerFactory;
052 private Journal journal;
053 private boolean useJournal = true;
054 private boolean useQuickJournal;
055 private File journalArchiveDirectory;
056 private boolean failIfJournalIsLocked;
057 private int journalThreadPriority = Thread.MAX_PRIORITY;
058 private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
059 private boolean useDedicatedTaskRunner;
060
061 public PersistenceAdapter createPersistenceAdapter() throws IOException {
062 jdbcPersistenceAdapter.setDataSource(getDataSource());
063
064 if (!useJournal) {
065 return jdbcPersistenceAdapter;
066 }
067 JournalPersistenceAdapter result = new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
068 result.setDirectory(getDataDirectoryFile());
069 return result;
070
071 }
072
073 public int getJournalLogFiles() {
074 return journalLogFiles;
075 }
076
077 /**
078 * Sets the number of journal log files to use
079 */
080 public void setJournalLogFiles(int journalLogFiles) {
081 this.journalLogFiles = journalLogFiles;
082 }
083
084 public int getJournalLogFileSize() {
085 return journalLogFileSize;
086 }
087
088 /**
089 * Sets the size of the journal log files
090 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
091 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
092 */
093 public void setJournalLogFileSize(int journalLogFileSize) {
094 this.journalLogFileSize = journalLogFileSize;
095 }
096
097 public JDBCPersistenceAdapter getJdbcAdapter() {
098 return jdbcPersistenceAdapter;
099 }
100
101 public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) {
102 this.jdbcPersistenceAdapter = jdbcAdapter;
103 }
104
105 public boolean isUseJournal() {
106 return useJournal;
107 }
108
109 /**
110 * Enables or disables the use of the journal. The default is to use the
111 * journal
112 *
113 * @param useJournal
114 */
115 public void setUseJournal(boolean useJournal) {
116 this.useJournal = useJournal;
117 }
118
119 public boolean isUseDedicatedTaskRunner() {
120 return useDedicatedTaskRunner;
121 }
122
123 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
124 this.useDedicatedTaskRunner = useDedicatedTaskRunner;
125 }
126
127 public TaskRunnerFactory getTaskRunnerFactory() {
128 if (taskRunnerFactory == null) {
129 taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority,
130 true, 1000, isUseDedicatedTaskRunner());
131 }
132 return taskRunnerFactory;
133 }
134
135 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
136 this.taskRunnerFactory = taskRunnerFactory;
137 }
138
139 public Journal getJournal() throws IOException {
140 if (journal == null) {
141 createJournal();
142 }
143 return journal;
144 }
145
146 public void setJournal(Journal journal) {
147 this.journal = journal;
148 }
149
150 public File getJournalArchiveDirectory() {
151 if (journalArchiveDirectory == null && useQuickJournal) {
152 journalArchiveDirectory = new File(getDataDirectoryFile(), "journal");
153 }
154 return journalArchiveDirectory;
155 }
156
157 public void setJournalArchiveDirectory(File journalArchiveDirectory) {
158 this.journalArchiveDirectory = journalArchiveDirectory;
159 }
160
161 public boolean isUseQuickJournal() {
162 return useQuickJournal;
163 }
164
165 /**
166 * Enables or disables the use of quick journal, which keeps messages in the
167 * journal and just stores a reference to the messages in JDBC. Defaults to
168 * false so that messages actually reside long term in the JDBC database.
169 */
170 public void setUseQuickJournal(boolean useQuickJournal) {
171 this.useQuickJournal = useQuickJournal;
172 }
173
174 public JDBCAdapter getAdapter() throws IOException {
175 return jdbcPersistenceAdapter.getAdapter();
176 }
177
178 public void setAdapter(JDBCAdapter adapter) {
179 jdbcPersistenceAdapter.setAdapter(adapter);
180 }
181
182 public Statements getStatements() {
183 return jdbcPersistenceAdapter.getStatements();
184 }
185
186 public void setStatements(Statements statements) {
187 jdbcPersistenceAdapter.setStatements(statements);
188 }
189
190 /**
191 * Sets whether or not an exclusive database lock should be used to enable
192 * JDBC Master/Slave. Enabled by default.
193 */
194 public void setUseDatabaseLock(boolean useDatabaseLock) {
195 jdbcPersistenceAdapter.setUseLock(useDatabaseLock);
196 }
197
198 public boolean isCreateTablesOnStartup() {
199 return jdbcPersistenceAdapter.isCreateTablesOnStartup();
200 }
201
202 /**
203 * Sets whether or not tables are created on startup
204 */
205 public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
206 jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup);
207 }
208
209 public int getJournalThreadPriority() {
210 return journalThreadPriority;
211 }
212
213 /**
214 * Sets the thread priority of the journal thread
215 */
216 public void setJournalThreadPriority(int journalThreadPriority) {
217 this.journalThreadPriority = journalThreadPriority;
218 }
219
220 /**
221 * @throws IOException
222 */
223 protected void createJournal() throws IOException {
224 File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile();
225 if (failIfJournalIsLocked) {
226 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
227 getJournalArchiveDirectory());
228 } else {
229 while (true) {
230 try {
231 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize,
232 getJournalArchiveDirectory());
233 break;
234 } catch (JournalLockedException e) {
235 LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000)
236 + " seconds for the journal to be unlocked.");
237 try {
238 Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
239 } catch (InterruptedException e1) {
240 }
241 }
242 }
243 }
244 }
245
246 @Override
247 public Locker createDefaultLocker() throws IOException {
248 return null;
249 }
250
251 @Override
252 public void init() throws Exception {
253 }
254
255 @Override
256 protected void doStop(ServiceStopper stopper) throws Exception {}
257
258 @Override
259 protected void doStart() throws Exception {}
260 }