001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.kahadb;
018
019 import java.io.File;
020 import java.io.FileFilter;
021 import java.io.IOException;
022 import java.nio.charset.Charset;
023 import java.util.HashMap;
024 import java.util.HashSet;
025 import java.util.LinkedList;
026 import java.util.List;
027 import java.util.Map;
028 import java.util.Set;
029
030 import org.apache.activemq.broker.BrokerService;
031 import org.apache.activemq.broker.BrokerServiceAware;
032 import org.apache.activemq.broker.ConnectionContext;
033 import org.apache.activemq.command.ActiveMQDestination;
034 import org.apache.activemq.command.ActiveMQQueue;
035 import org.apache.activemq.command.ActiveMQTopic;
036 import org.apache.activemq.command.LocalTransactionId;
037 import org.apache.activemq.command.ProducerId;
038 import org.apache.activemq.command.TransactionId;
039 import org.apache.activemq.command.XATransactionId;
040 import org.apache.activemq.filter.AnyDestination;
041 import org.apache.activemq.filter.DestinationMap;
042 import org.apache.activemq.protobuf.Buffer;
043 import org.apache.activemq.store.MessageStore;
044 import org.apache.activemq.store.PersistenceAdapter;
045 import org.apache.activemq.store.TopicMessageStore;
046 import org.apache.activemq.store.TransactionStore;
047 import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
048 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
049 import org.apache.activemq.usage.SystemUsage;
050 import org.apache.activemq.util.IOHelper;
051 import org.apache.activemq.util.IntrospectionSupport;
052 import org.slf4j.Logger;
053 import org.slf4j.LoggerFactory;
054
055 /**
056 * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} that supports
057 * distribution of destinations across multiple kahaDB persistence adapters
058 *
059 * @org.apache.xbean.XBean element="mKahaDB"
060 */
061 public class MultiKahaDBPersistenceAdapter extends DestinationMap implements PersistenceAdapter, BrokerServiceAware {
062 static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
063
064 final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
065 final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
066
067 BrokerService brokerService;
068 List<KahaDBPersistenceAdapter> adapters = new LinkedList<KahaDBPersistenceAdapter>();
069 private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
070
071 MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
072
073 // all local store transactions are XA, 2pc if more than one adapter involved
074 TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
075 @Override
076 public KahaTransactionInfo transform(TransactionId txid) {
077 if (txid == null) {
078 return null;
079 }
080 KahaTransactionInfo rc = new KahaTransactionInfo();
081 KahaXATransactionId kahaTxId = new KahaXATransactionId();
082 if (txid.isLocalTransaction()) {
083 LocalTransactionId t = (LocalTransactionId) txid;
084 kahaTxId.setBranchQualifier(new Buffer(Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"))));
085 kahaTxId.setGlobalTransactionId(new Buffer(t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"))));
086 kahaTxId.setFormatId(LOCAL_FORMAT_ID_MAGIC);
087 } else {
088 XATransactionId t = (XATransactionId) txid;
089 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
090 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
091 kahaTxId.setFormatId(t.getFormatId());
092 }
093 rc.setXaTransactionId(kahaTxId);
094 return rc;
095 }
096 };
097
098 /**
099 * Sets the FilteredKahaDBPersistenceAdapter entries
100 *
101 * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
102 */
103 @SuppressWarnings({ "rawtypes", "unchecked" })
104 public void setFilteredPersistenceAdapters(List entries) {
105 for (Object entry : entries) {
106 FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
107 KahaDBPersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
108 if (filteredAdapter.getDestination() == null) {
109 filteredAdapter.setDestination(matchAll);
110 }
111
112 if (filteredAdapter.isPerDestination()) {
113 configureDirectory(adapter, null);
114 // per destination adapters will be created on demand or during recovery
115 continue;
116 } else {
117 configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
118 }
119
120 configureAdapter(adapter);
121 adapters.add(adapter);
122 }
123 super.setEntries(entries);
124 }
125
126 private String nameFromDestinationFilter(ActiveMQDestination destination) {
127 return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
128 }
129
130 public boolean isLocalXid(TransactionId xid) {
131 return xid instanceof XATransactionId &&
132 ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
133 }
134
135 public void beginTransaction(ConnectionContext context) throws IOException {
136 throw new IllegalStateException();
137 }
138
139 public void checkpoint(final boolean sync) throws IOException {
140 for (PersistenceAdapter persistenceAdapter : adapters) {
141 persistenceAdapter.checkpoint(sync);
142 }
143 }
144
145 public void commitTransaction(ConnectionContext context) throws IOException {
146 throw new IllegalStateException();
147 }
148
149 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
150 PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
151 return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
152 }
153
154 private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) {
155 Object result = this.chooseValue(destination);
156 if (result == null) {
157 throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
158 }
159 FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
160 if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
161 result = addAdapter(filteredAdapter, destination);
162 startAdapter(((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter(), destination.getQualifiedName());
163 if (LOG.isTraceEnabled()) {
164 LOG.info("created per destination adapter for: " + destination + ", " + result);
165 }
166 }
167 return ((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter();
168 }
169
170 private void startAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) {
171 try {
172 kahaDBPersistenceAdapter.start();
173 } catch (Exception e) {
174 RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
175 LOG.error(detail.toString(), e);
176 throw detail;
177 }
178 }
179
180 private void stopAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) {
181 try {
182 kahaDBPersistenceAdapter.stop();
183 } catch (Exception e) {
184 RuntimeException detail = new RuntimeException("Failed to stop per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
185 LOG.error(detail.toString(), e);
186 throw detail;
187 }
188 }
189
190 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
191 PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
192 return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
193 }
194
195 public TransactionStore createTransactionStore() throws IOException {
196 return transactionStore;
197 }
198
199 public void deleteAllMessages() throws IOException {
200 for (PersistenceAdapter persistenceAdapter : adapters) {
201 persistenceAdapter.deleteAllMessages();
202 }
203 transactionStore.deleteAllMessages();
204 IOHelper.deleteChildren(getDirectory());
205 }
206
207 public Set<ActiveMQDestination> getDestinations() {
208 Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
209 for (PersistenceAdapter persistenceAdapter : adapters) {
210 results.addAll(persistenceAdapter.getDestinations());
211 }
212 return results;
213 }
214
215 public long getLastMessageBrokerSequenceId() throws IOException {
216 long maxId = -1;
217 for (PersistenceAdapter persistenceAdapter : adapters) {
218 maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
219 }
220 return maxId;
221 }
222
223 public long getLastProducerSequenceId(ProducerId id) throws IOException {
224 long maxId = -1;
225 for (PersistenceAdapter persistenceAdapter : adapters) {
226 maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
227 }
228 return maxId;
229 }
230
231 public void removeQueueMessageStore(ActiveMQQueue destination) {
232 PersistenceAdapter adapter = getMatchingPersistenceAdapter(destination);
233 adapter.removeQueueMessageStore(destination);
234 if (adapter instanceof KahaDBPersistenceAdapter) {
235 adapter.removeQueueMessageStore(destination);
236 removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
237 }
238 }
239
240 public void removeTopicMessageStore(ActiveMQTopic destination) {
241 PersistenceAdapter adapter = getMatchingPersistenceAdapter(destination);
242 if (adapter instanceof KahaDBPersistenceAdapter) {
243 adapter.removeTopicMessageStore(destination);
244 removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
245 }
246 }
247
248 private void removeMessageStore(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) {
249 if (adapter.getDestinations().isEmpty()) {
250 stopAdapter(adapter, destination.toString());
251 File adapterDir = adapter.getDirectory();
252 if (adapterDir != null) {
253 if (IOHelper.deleteFile(adapterDir)) {
254 if (LOG.isTraceEnabled()) {
255 LOG.info("deleted per destination adapter directory for: " + destination);
256 }
257 } else {
258 if (LOG.isTraceEnabled()) {
259 LOG.info("failed to deleted per destination adapter directory for: " + destination);
260 }
261 }
262 }
263 }
264 }
265
266 public void rollbackTransaction(ConnectionContext context) throws IOException {
267 throw new IllegalStateException();
268 }
269
270 public void setBrokerName(String brokerName) {
271 for (PersistenceAdapter persistenceAdapter : adapters) {
272 persistenceAdapter.setBrokerName(brokerName);
273 }
274 }
275
276 public void setUsageManager(SystemUsage usageManager) {
277 for (PersistenceAdapter persistenceAdapter : adapters) {
278 persistenceAdapter.setUsageManager(usageManager);
279 }
280 }
281
282 public long size() {
283 long size = 0;
284 for (PersistenceAdapter persistenceAdapter : adapters) {
285 size += persistenceAdapter.size();
286 }
287 return size;
288 }
289
290 public void start() throws Exception {
291 Object result = this.chooseValue(matchAll);
292 if (result != null) {
293 FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
294 if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
295 findAndRegisterExistingAdapters(filteredAdapter);
296 }
297 }
298 for (PersistenceAdapter persistenceAdapter : adapters) {
299 persistenceAdapter.start();
300 }
301 }
302
303 private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) {
304 FileFilter destinationNames = new FileFilter() {
305 @Override
306 public boolean accept(File file) {
307 return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
308 }
309 };
310 File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
311 if (candidates != null) {
312 for (File candidate : candidates) {
313 registerExistingAdapter(template, candidate);
314 }
315 }
316 }
317
318 private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) {
319 KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName());
320 startAdapter(adapter, candidate.getName());
321 Set<ActiveMQDestination> destinations = adapter.getDestinations();
322 if (destinations.size() != 0) {
323 registerAdapter(adapter, destinations.toArray(new ActiveMQDestination[]{})[0]);
324 } else {
325 stopAdapter(adapter, candidate.getName());
326 }
327 }
328
329 private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) {
330 KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), nameFromDestinationFilter(destination));
331 return registerAdapter(adapter, destination);
332 }
333
334 private KahaDBPersistenceAdapter adapterFromTemplate(KahaDBPersistenceAdapter template, String destinationName) {
335 KahaDBPersistenceAdapter adapter = kahaDBFromTemplate(template);
336 configureAdapter(adapter);
337 configureDirectory(adapter, destinationName);
338 return adapter;
339 }
340
341 private void configureDirectory(KahaDBPersistenceAdapter adapter, String fileName) {
342 File directory = null;
343 if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
344 // not set so inherit from mkahadb
345 directory = getDirectory();
346 } else {
347 directory = adapter.getDirectory();
348 }
349 if (fileName != null) {
350 directory = new File(directory, fileName);
351 }
352 adapter.setDirectory(directory);
353 }
354
355 private FilteredKahaDBPersistenceAdapter registerAdapter(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) {
356 adapters.add(adapter);
357 FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination, adapter);
358 put(destination, result);
359 return result;
360 }
361
362 private void configureAdapter(KahaDBPersistenceAdapter adapter) {
363 // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
364 adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
365 adapter.setBrokerService(getBrokerService());
366 }
367
368 private KahaDBPersistenceAdapter kahaDBFromTemplate(KahaDBPersistenceAdapter template) {
369 Map<String, Object> configuration = new HashMap<String, Object>();
370 IntrospectionSupport.getProperties(template, configuration, null);
371 KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
372 IntrospectionSupport.setProperties(adapter, configuration);
373 return adapter;
374 }
375
376 public void stop() throws Exception {
377 for (PersistenceAdapter persistenceAdapter : adapters) {
378 persistenceAdapter.stop();
379 }
380 }
381
382 public File getDirectory() {
383 return this.directory;
384 }
385
386 @Override
387 public void setDirectory(File directory) {
388 this.directory = directory;
389 }
390
391 public void setBrokerService(BrokerService brokerService) {
392 for (KahaDBPersistenceAdapter persistenceAdapter : adapters) {
393 persistenceAdapter.setBrokerService(brokerService);
394 }
395 this.brokerService = brokerService;
396 }
397
398 public BrokerService getBrokerService() {
399 return brokerService;
400 }
401
402 public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
403 this.transactionStore = transactionStore;
404 }
405
406 /**
407 * Set the max file length of the transaction journal
408 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
409 * be used
410 *
411 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
412 */
413 public void setJournalMaxFileLength(int maxFileLength) {
414 transactionStore.setJournalMaxFileLength(maxFileLength);
415 }
416
417 public int getJournalMaxFileLength() {
418 return transactionStore.getJournalMaxFileLength();
419 }
420
421 /**
422 * Set the max write batch size of the transaction journal
423 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
424 * be used
425 *
426 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
427 */
428 public void setJournalWriteBatchSize(int journalWriteBatchSize) {
429 transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
430 }
431
432 public int getJournalWriteBatchSize() {
433 return transactionStore.getJournalMaxWriteBatchSize();
434 }
435
436 @Override
437 public String toString() {
438 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
439 return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
440 }
441
442 }