001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.File;
020    import java.io.FileFilter;
021    import java.io.IOException;
022    import java.nio.charset.Charset;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.LinkedList;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Set;
029    
030    import org.apache.activemq.broker.BrokerService;
031    import org.apache.activemq.broker.BrokerServiceAware;
032    import org.apache.activemq.broker.ConnectionContext;
033    import org.apache.activemq.command.ActiveMQDestination;
034    import org.apache.activemq.command.ActiveMQQueue;
035    import org.apache.activemq.command.ActiveMQTopic;
036    import org.apache.activemq.command.LocalTransactionId;
037    import org.apache.activemq.command.ProducerId;
038    import org.apache.activemq.command.TransactionId;
039    import org.apache.activemq.command.XATransactionId;
040    import org.apache.activemq.filter.AnyDestination;
041    import org.apache.activemq.filter.DestinationMap;
042    import org.apache.activemq.protobuf.Buffer;
043    import org.apache.activemq.store.MessageStore;
044    import org.apache.activemq.store.PersistenceAdapter;
045    import org.apache.activemq.store.TopicMessageStore;
046    import org.apache.activemq.store.TransactionStore;
047    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
048    import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
049    import org.apache.activemq.usage.SystemUsage;
050    import org.apache.activemq.util.IOHelper;
051    import org.apache.activemq.util.IntrospectionSupport;
052    import org.slf4j.Logger;
053    import org.slf4j.LoggerFactory;
054    
055    /**
056     * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
057     * distribution of destinations across multiple kahaDB persistence adapters
058     *
059     * @org.apache.xbean.XBean element="mKahaDB"
060     */
061    public class MultiKahaDBPersistenceAdapter extends DestinationMap implements PersistenceAdapter, BrokerServiceAware {
062        static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
063    
064        final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
065        final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
066    
067        BrokerService brokerService;
068        List<KahaDBPersistenceAdapter> adapters = new LinkedList<KahaDBPersistenceAdapter>();
069        private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
070    
071        MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
072    
073        // all local store transactions are XA, 2pc if more than one adapter involved
074        TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
075            @Override
076            public KahaTransactionInfo transform(TransactionId txid) {
077                if (txid == null) {
078                    return null;
079                }
080                KahaTransactionInfo rc = new KahaTransactionInfo();
081                KahaXATransactionId kahaTxId = new KahaXATransactionId();
082                if (txid.isLocalTransaction()) {
083                    LocalTransactionId t = (LocalTransactionId) txid;
084                    kahaTxId.setBranchQualifier(new Buffer(Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"))));
085                    kahaTxId.setGlobalTransactionId(new Buffer(t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"))));
086                    kahaTxId.setFormatId(LOCAL_FORMAT_ID_MAGIC);
087                } else {
088                    XATransactionId t = (XATransactionId) txid;
089                    kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
090                    kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
091                    kahaTxId.setFormatId(t.getFormatId());
092                }
093                rc.setXaTransactionId(kahaTxId);
094                return rc;
095            }
096        };
097    
098        /**
099         * Sets the  FilteredKahaDBPersistenceAdapter entries
100         *
101         * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
102         */
103        @SuppressWarnings({ "rawtypes", "unchecked" })
104        public void setFilteredPersistenceAdapters(List entries) {
105            for (Object entry : entries) {
106                FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
107                KahaDBPersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
108                if (filteredAdapter.getDestination() == null) {
109                    filteredAdapter.setDestination(matchAll);
110                }
111    
112                if (filteredAdapter.isPerDestination()) {
113                    configureDirectory(adapter, null);
114                    // per destination adapters will be created on demand or during recovery
115                    continue;
116                } else {
117                    configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
118                }
119    
120                configureAdapter(adapter);
121                adapters.add(adapter);
122            }
123            super.setEntries(entries);
124        }
125    
126        private String nameFromDestinationFilter(ActiveMQDestination destination) {
127            return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
128        }
129    
130        public boolean isLocalXid(TransactionId xid) {
131            return xid instanceof XATransactionId &&
132                    ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
133        }
134    
135        public void beginTransaction(ConnectionContext context) throws IOException {
136            throw new IllegalStateException();
137        }
138    
139        public void checkpoint(final boolean sync) throws IOException {
140            for (PersistenceAdapter persistenceAdapter : adapters) {
141                persistenceAdapter.checkpoint(sync);
142            }
143        }
144    
145        public void commitTransaction(ConnectionContext context) throws IOException {
146            throw new IllegalStateException();
147        }
148    
149        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
150            PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
151            return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
152        }
153    
154        private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) {
155            Object result = this.chooseValue(destination);
156            if (result == null) {
157                throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
158            }
159            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
160            if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
161                result = addAdapter(filteredAdapter, destination);
162                startAdapter(((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter(), destination.getQualifiedName());
163                if (LOG.isTraceEnabled()) {
164                    LOG.info("created per destination adapter for: " + destination  + ", " + result);
165                }
166            }
167            return ((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter();
168        }
169    
170        private void startAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) {
171            try {
172                kahaDBPersistenceAdapter.start();
173            } catch (Exception e) {
174                RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
175                LOG.error(detail.toString(), e);
176                throw detail;
177            }
178        }
179    
180        private void stopAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) {
181            try {
182                kahaDBPersistenceAdapter.stop();
183            } catch (Exception e) {
184                RuntimeException detail = new RuntimeException("Failed to stop per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
185                LOG.error(detail.toString(), e);
186                throw detail;
187            }
188        }
189    
190        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
191            PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
192            return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
193        }
194    
195        public TransactionStore createTransactionStore() throws IOException {
196            return transactionStore;
197        }
198    
199        public void deleteAllMessages() throws IOException {
200            for (PersistenceAdapter persistenceAdapter : adapters) {
201                persistenceAdapter.deleteAllMessages();
202            }
203            transactionStore.deleteAllMessages();
204            IOHelper.deleteChildren(getDirectory());
205        }
206    
207        public Set<ActiveMQDestination> getDestinations() {
208            Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
209            for (PersistenceAdapter persistenceAdapter : adapters) {
210                results.addAll(persistenceAdapter.getDestinations());
211            }
212            return results;
213        }
214    
215        public long getLastMessageBrokerSequenceId() throws IOException {
216            long maxId = -1;
217            for (PersistenceAdapter persistenceAdapter : adapters) {
218                maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
219            }
220            return maxId;
221        }
222    
223        public long getLastProducerSequenceId(ProducerId id) throws IOException {
224            long maxId = -1;
225            for (PersistenceAdapter persistenceAdapter : adapters) {
226                maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
227            }
228            return maxId;
229        }
230    
231        public void removeQueueMessageStore(ActiveMQQueue destination) {
232            PersistenceAdapter adapter = getMatchingPersistenceAdapter(destination);
233            adapter.removeQueueMessageStore(destination);
234            if (adapter instanceof KahaDBPersistenceAdapter) {
235                adapter.removeQueueMessageStore(destination);
236                removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
237            }
238        }
239    
240        public void removeTopicMessageStore(ActiveMQTopic destination) {
241            PersistenceAdapter adapter = getMatchingPersistenceAdapter(destination);
242            if (adapter instanceof KahaDBPersistenceAdapter) {
243                adapter.removeTopicMessageStore(destination);
244                removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
245            }
246        }
247    
248        private void removeMessageStore(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) {
249            if (adapter.getDestinations().isEmpty()) {
250                stopAdapter(adapter, destination.toString());
251                File adapterDir = adapter.getDirectory();
252                if (adapterDir != null) {
253                    if (IOHelper.deleteFile(adapterDir)) {
254                        if (LOG.isTraceEnabled()) {
255                            LOG.info("deleted per destination adapter directory for: " + destination);
256                        }
257                    } else {
258                        if (LOG.isTraceEnabled()) {
259                            LOG.info("failed to deleted per destination adapter directory for: " + destination);
260                        }
261                    }
262                }
263            }
264        }
265    
266        public void rollbackTransaction(ConnectionContext context) throws IOException {
267            throw new IllegalStateException();
268        }
269    
270        public void setBrokerName(String brokerName) {
271            for (PersistenceAdapter persistenceAdapter : adapters) {
272                persistenceAdapter.setBrokerName(brokerName);
273            }
274        }
275    
276        public void setUsageManager(SystemUsage usageManager) {
277            for (PersistenceAdapter persistenceAdapter : adapters) {
278                persistenceAdapter.setUsageManager(usageManager);
279            }
280        }
281    
282        public long size() {
283            long size = 0;
284            for (PersistenceAdapter persistenceAdapter : adapters) {
285                size += persistenceAdapter.size();
286            }
287            return size;
288        }
289    
290        public void start() throws Exception {
291            Object result = this.chooseValue(matchAll);
292            if (result != null) {
293                FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
294                if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
295                    findAndRegisterExistingAdapters(filteredAdapter);
296                }
297            }
298            for (PersistenceAdapter persistenceAdapter : adapters) {
299                persistenceAdapter.start();
300            }
301        }
302    
303        private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) {
304            FileFilter destinationNames = new FileFilter() {
305                @Override
306                public boolean accept(File file) {
307                    return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
308                }
309            };
310            File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
311            if (candidates != null) {
312                for (File candidate : candidates) {
313                    registerExistingAdapter(template, candidate);
314                }
315            }
316        }
317    
318        private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) {
319            KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName());
320            startAdapter(adapter, candidate.getName());
321            Set<ActiveMQDestination> destinations = adapter.getDestinations();
322            if (destinations.size() != 0) {
323                registerAdapter(adapter, destinations.toArray(new ActiveMQDestination[]{})[0]);
324            } else {
325                stopAdapter(adapter, candidate.getName());
326            }
327        }
328    
329        private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) {
330            KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), nameFromDestinationFilter(destination));
331            return registerAdapter(adapter, destination);
332        }
333    
334        private KahaDBPersistenceAdapter adapterFromTemplate(KahaDBPersistenceAdapter template, String destinationName) {
335            KahaDBPersistenceAdapter adapter = kahaDBFromTemplate(template);
336            configureAdapter(adapter);
337            configureDirectory(adapter, destinationName);
338            return adapter;
339        }
340    
341        private void configureDirectory(KahaDBPersistenceAdapter adapter, String fileName) {
342            File directory = null;
343            if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
344                // not set so inherit from mkahadb
345                directory = getDirectory();
346            } else {
347                directory = adapter.getDirectory();
348            }
349            if (fileName != null) {
350                directory = new File(directory, fileName);
351            }
352            adapter.setDirectory(directory);
353        }
354    
355        private FilteredKahaDBPersistenceAdapter registerAdapter(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) {
356            adapters.add(adapter);
357            FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination, adapter);
358            put(destination, result);
359            return result;
360        }
361    
362        private void configureAdapter(KahaDBPersistenceAdapter adapter) {
363            // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
364            adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
365            adapter.setBrokerService(getBrokerService());
366        }
367    
368        private KahaDBPersistenceAdapter kahaDBFromTemplate(KahaDBPersistenceAdapter template) {
369            Map<String, Object> configuration = new HashMap<String, Object>();
370            IntrospectionSupport.getProperties(template, configuration, null);
371            KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
372            IntrospectionSupport.setProperties(adapter, configuration);
373            return adapter;
374        }
375    
376        public void stop() throws Exception {
377            for (PersistenceAdapter persistenceAdapter : adapters) {
378                persistenceAdapter.stop();
379            }
380        }
381    
382        public File getDirectory() {
383            return this.directory;
384        }
385    
386        @Override
387        public void setDirectory(File directory) {
388            this.directory = directory;
389        }
390    
391        public void setBrokerService(BrokerService brokerService) {
392            for (KahaDBPersistenceAdapter persistenceAdapter : adapters) {
393                persistenceAdapter.setBrokerService(brokerService);
394            }
395            this.brokerService = brokerService;
396        }
397    
398        public BrokerService getBrokerService() {
399            return brokerService;
400        }
401    
402        public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
403            this.transactionStore = transactionStore;
404        }
405    
406        /**
407         * Set the max file length of the transaction journal
408         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
409         * be used
410         *
411         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
412         */
413        public void setJournalMaxFileLength(int maxFileLength) {
414            transactionStore.setJournalMaxFileLength(maxFileLength);
415        }
416    
417        public int getJournalMaxFileLength() {
418            return transactionStore.getJournalMaxFileLength();
419        }
420    
421        /**
422         * Set the max write batch size of  the transaction journal
423         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
424         * be used
425         *
426         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
427         */
428        public void setJournalWriteBatchSize(int journalWriteBatchSize) {
429            transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
430        }
431    
432        public int getJournalWriteBatchSize() {
433            return transactionStore.getJournalMaxWriteBatchSize();
434        }
435    
436        @Override
437        public String toString() {
438            String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
439            return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
440        }
441    
442    }