001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.File;
020    import java.io.FileFilter;
021    import java.io.IOException;
022    import java.nio.charset.Charset;
023    import java.util.HashMap;
024    import java.util.HashSet;
025    import java.util.LinkedList;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Set;
029    
030    import org.apache.activemq.broker.BrokerService;
031    import org.apache.activemq.broker.BrokerServiceAware;
032    import org.apache.activemq.broker.ConnectionContext;
033    import org.apache.activemq.command.ActiveMQDestination;
034    import org.apache.activemq.command.ActiveMQQueue;
035    import org.apache.activemq.command.ActiveMQTopic;
036    import org.apache.activemq.command.LocalTransactionId;
037    import org.apache.activemq.command.ProducerId;
038    import org.apache.activemq.command.TransactionId;
039    import org.apache.activemq.command.XATransactionId;
040    import org.apache.activemq.filter.AnyDestination;
041    import org.apache.activemq.filter.DestinationMap;
042    import org.apache.activemq.protobuf.Buffer;
043    import org.apache.activemq.store.MessageStore;
044    import org.apache.activemq.store.PersistenceAdapter;
045    import org.apache.activemq.store.TopicMessageStore;
046    import org.apache.activemq.store.TransactionStore;
047    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
048    import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
049    import org.apache.activemq.usage.SystemUsage;
050    import org.apache.activemq.util.IOHelper;
051    import org.apache.activemq.util.IntrospectionSupport;
052    import org.slf4j.Logger;
053    import org.slf4j.LoggerFactory;
054    
055    /**
056     * An implementation of {@link org.apache.activemq.store.PersistenceAdapter}  that supports
057     * distribution of destinations across multiple kahaDB persistence adapters
058     *
059     * @org.apache.xbean.XBean element="mKahaDB"
060     */
061    public class MultiKahaDBPersistenceAdapter extends DestinationMap implements PersistenceAdapter, BrokerServiceAware {
062        static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBPersistenceAdapter.class);
063    
064        final static ActiveMQDestination matchAll = new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")});
065        final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
066    
067        BrokerService brokerService;
068        List<KahaDBPersistenceAdapter> adapters = new LinkedList<KahaDBPersistenceAdapter>();
069        private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
070    
071        MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
072    
073        // all local store transactions are XA, 2pc if more than one adapter involved
074        TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
075            @Override
076            public KahaTransactionInfo transform(TransactionId txid) {
077                if (txid == null) {
078                    return null;
079                }
080                KahaTransactionInfo rc = new KahaTransactionInfo();
081                KahaXATransactionId kahaTxId = new KahaXATransactionId();
082                if (txid.isLocalTransaction()) {
083                    LocalTransactionId t = (LocalTransactionId) txid;
084                    kahaTxId.setBranchQualifier(new Buffer(Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"))));
085                    kahaTxId.setGlobalTransactionId(new Buffer(t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"))));
086                    kahaTxId.setFormatId(LOCAL_FORMAT_ID_MAGIC);
087                } else {
088                    XATransactionId t = (XATransactionId) txid;
089                    kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
090                    kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
091                    kahaTxId.setFormatId(t.getFormatId());
092                }
093                rc.setXaTransactionId(kahaTxId);
094                return rc;
095            }
096        };
097    
098        /**
099         * Sets the  FilteredKahaDBPersistenceAdapter entries
100         *
101         * @org.apache.xbean.ElementType class="org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter"
102         */
103        @SuppressWarnings({ "rawtypes", "unchecked" })
104        public void setFilteredPersistenceAdapters(List entries) {
105            for (Object entry : entries) {
106                FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
107                KahaDBPersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
108                if (filteredAdapter.getDestination() == null) {
109                    filteredAdapter.setDestination(matchAll);
110                }
111    
112                if (filteredAdapter.isPerDestination()) {
113                    configureDirectory(adapter, null);
114                    // per destination adapters will be created on demand or during recovery
115                    continue;
116                } else {
117                    configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
118                }
119    
120                configureAdapter(adapter);
121                adapters.add(adapter);
122            }
123            super.setEntries(entries);
124        }
125    
126        private String nameFromDestinationFilter(ActiveMQDestination destination) {
127            if (destination.getQualifiedName().length() > IOHelper.getMaxFileNameLength()) {
128                LOG.warn("Destination name is longer than 'MaximumFileNameLength' system property, " +
129                         "potential problem with recovery can result from name truncation.");
130            }
131    
132            return IOHelper.toFileSystemSafeName(destination.getQualifiedName());
133        }
134    
135        public boolean isLocalXid(TransactionId xid) {
136            return xid instanceof XATransactionId &&
137                    ((XATransactionId)xid).getFormatId() == LOCAL_FORMAT_ID_MAGIC;
138        }
139    
140        @Override
141        public void beginTransaction(ConnectionContext context) throws IOException {
142            throw new IllegalStateException();
143        }
144    
145        @Override
146        public void checkpoint(final boolean sync) throws IOException {
147            for (PersistenceAdapter persistenceAdapter : adapters) {
148                persistenceAdapter.checkpoint(sync);
149            }
150        }
151    
152        @Override
153        public void commitTransaction(ConnectionContext context) throws IOException {
154            throw new IllegalStateException();
155        }
156    
157        @Override
158        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
159            PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
160            return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
161        }
162    
163        private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) {
164            Object result = this.chooseValue(destination);
165            if (result == null) {
166                throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
167            }
168            FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
169            if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
170                result = addAdapter(filteredAdapter, destination);
171                startAdapter(((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter(), destination.getQualifiedName());
172                if (LOG.isTraceEnabled()) {
173                    LOG.info("created per destination adapter for: " + destination  + ", " + result);
174                }
175            }
176            return ((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter();
177        }
178    
179        private void startAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) {
180            try {
181                kahaDBPersistenceAdapter.start();
182            } catch (Exception e) {
183                RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
184                LOG.error(detail.toString(), e);
185                throw detail;
186            }
187        }
188    
189        private void stopAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) {
190            try {
191                kahaDBPersistenceAdapter.stop();
192            } catch (Exception e) {
193                RuntimeException detail = new RuntimeException("Failed to stop per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
194                LOG.error(detail.toString(), e);
195                throw detail;
196            }
197        }
198    
199        @Override
200        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
201            PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
202            return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
203        }
204    
205        @Override
206        public TransactionStore createTransactionStore() throws IOException {
207            return transactionStore;
208        }
209    
210        @Override
211        public void deleteAllMessages() throws IOException {
212            for (PersistenceAdapter persistenceAdapter : adapters) {
213                persistenceAdapter.deleteAllMessages();
214            }
215            transactionStore.deleteAllMessages();
216            IOHelper.deleteChildren(getDirectory());
217        }
218    
219        @Override
220        public Set<ActiveMQDestination> getDestinations() {
221            Set<ActiveMQDestination> results = new HashSet<ActiveMQDestination>();
222            for (PersistenceAdapter persistenceAdapter : adapters) {
223                results.addAll(persistenceAdapter.getDestinations());
224            }
225            return results;
226        }
227    
228        @Override
229        public long getLastMessageBrokerSequenceId() throws IOException {
230            long maxId = -1;
231            for (PersistenceAdapter persistenceAdapter : adapters) {
232                maxId = Math.max(maxId, persistenceAdapter.getLastMessageBrokerSequenceId());
233            }
234            return maxId;
235        }
236    
237        @Override
238        public long getLastProducerSequenceId(ProducerId id) throws IOException {
239            long maxId = -1;
240            for (PersistenceAdapter persistenceAdapter : adapters) {
241                maxId = Math.max(maxId, persistenceAdapter.getLastProducerSequenceId(id));
242            }
243            return maxId;
244        }
245    
246        @Override
247        public void removeQueueMessageStore(ActiveMQQueue destination) {
248            PersistenceAdapter adapter = getMatchingPersistenceAdapter(destination);
249            if (adapter instanceof KahaDBPersistenceAdapter) {
250                adapter.removeQueueMessageStore(destination);
251                removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
252                removeAll(destination);
253            }
254        }
255    
256        @Override
257        public void removeTopicMessageStore(ActiveMQTopic destination) {
258            PersistenceAdapter adapter = getMatchingPersistenceAdapter(destination);
259            if (adapter instanceof KahaDBPersistenceAdapter) {
260                adapter.removeTopicMessageStore(destination);
261                removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
262                removeAll(destination);
263            }
264        }
265    
266        private void removeMessageStore(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) {
267            if (adapter.getDestinations().isEmpty()) {
268                stopAdapter(adapter, destination.toString());
269                File adapterDir = adapter.getDirectory();
270                if (adapterDir != null) {
271                    if (IOHelper.deleteFile(adapterDir)) {
272                        if (LOG.isTraceEnabled()) {
273                            LOG.info("deleted per destination adapter directory for: " + destination);
274                        }
275                    } else {
276                        if (LOG.isTraceEnabled()) {
277                            LOG.info("failed to deleted per destination adapter directory for: " + destination);
278                        }
279                    }
280                }
281            }
282        }
283    
284        @Override
285        public void rollbackTransaction(ConnectionContext context) throws IOException {
286            throw new IllegalStateException();
287        }
288    
289        @Override
290        public void setBrokerName(String brokerName) {
291            for (PersistenceAdapter persistenceAdapter : adapters) {
292                persistenceAdapter.setBrokerName(brokerName);
293            }
294        }
295    
296        @Override
297        public void setUsageManager(SystemUsage usageManager) {
298            for (PersistenceAdapter persistenceAdapter : adapters) {
299                persistenceAdapter.setUsageManager(usageManager);
300            }
301        }
302    
303        @Override
304        public long size() {
305            long size = 0;
306            for (PersistenceAdapter persistenceAdapter : adapters) {
307                size += persistenceAdapter.size();
308            }
309            return size;
310        }
311    
312        @Override
313        public void start() throws Exception {
314            Object result = this.chooseValue(matchAll);
315            if (result != null) {
316                FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
317                if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
318                    findAndRegisterExistingAdapters(filteredAdapter);
319                }
320            }
321            for (PersistenceAdapter persistenceAdapter : adapters) {
322                persistenceAdapter.start();
323            }
324        }
325    
326        private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) {
327            FileFilter destinationNames = new FileFilter() {
328                @Override
329                public boolean accept(File file) {
330                    return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
331                }
332            };
333            File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
334            if (candidates != null) {
335                for (File candidate : candidates) {
336                    registerExistingAdapter(template, candidate);
337                }
338            }
339        }
340    
341        private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) {
342            KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName());
343            startAdapter(adapter, candidate.getName());
344            Set<ActiveMQDestination> destinations = adapter.getDestinations();
345            if (destinations.size() != 0) {
346                registerAdapter(adapter, destinations.toArray(new ActiveMQDestination[]{})[0]);
347            } else {
348                stopAdapter(adapter, candidate.getName());
349            }
350        }
351    
352        private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) {
353            KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), nameFromDestinationFilter(destination));
354            return registerAdapter(adapter, destination);
355        }
356    
357        private KahaDBPersistenceAdapter adapterFromTemplate(KahaDBPersistenceAdapter template, String destinationName) {
358            KahaDBPersistenceAdapter adapter = kahaDBFromTemplate(template);
359            configureAdapter(adapter);
360            configureDirectory(adapter, destinationName);
361            return adapter;
362        }
363    
364        private void configureDirectory(KahaDBPersistenceAdapter adapter, String fileName) {
365            File directory = null;
366            if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
367                // not set so inherit from mkahadb
368                directory = getDirectory();
369            } else {
370                directory = adapter.getDirectory();
371            }
372            if (fileName != null) {
373                directory = new File(directory, fileName);
374            }
375            adapter.setDirectory(directory);
376        }
377    
378        private FilteredKahaDBPersistenceAdapter registerAdapter(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) {
379            adapters.add(adapter);
380            FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination, adapter);
381            put(destination, result);
382            return result;
383        }
384    
385        private void configureAdapter(KahaDBPersistenceAdapter adapter) {
386            // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
387            adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
388            adapter.setBrokerService(getBrokerService());
389        }
390    
391        private KahaDBPersistenceAdapter kahaDBFromTemplate(KahaDBPersistenceAdapter template) {
392            Map<String, Object> configuration = new HashMap<String, Object>();
393            IntrospectionSupport.getProperties(template, configuration, null);
394            KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
395            IntrospectionSupport.setProperties(adapter, configuration);
396            return adapter;
397        }
398    
399        @Override
400        public void stop() throws Exception {
401            for (PersistenceAdapter persistenceAdapter : adapters) {
402                persistenceAdapter.stop();
403            }
404        }
405    
406        @Override
407        public File getDirectory() {
408            return this.directory;
409        }
410    
411        @Override
412        public void setDirectory(File directory) {
413            this.directory = directory;
414        }
415    
416        @Override
417        public void setBrokerService(BrokerService brokerService) {
418            for (KahaDBPersistenceAdapter persistenceAdapter : adapters) {
419                persistenceAdapter.setBrokerService(brokerService);
420            }
421            this.brokerService = brokerService;
422        }
423    
424        public BrokerService getBrokerService() {
425            return brokerService;
426        }
427    
428        public void setTransactionStore(MultiKahaDBTransactionStore transactionStore) {
429            this.transactionStore = transactionStore;
430        }
431    
432        /**
433         * Set the max file length of the transaction journal
434         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
435         * be used
436         *
437         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
438         */
439        public void setJournalMaxFileLength(int maxFileLength) {
440            transactionStore.setJournalMaxFileLength(maxFileLength);
441        }
442    
443        public int getJournalMaxFileLength() {
444            return transactionStore.getJournalMaxFileLength();
445        }
446    
447        /**
448         * Set the max write batch size of  the transaction journal
449         * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
450         * be used
451         *
452         * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
453         */
454        public void setJournalWriteBatchSize(int journalWriteBatchSize) {
455            transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
456        }
457    
458        public int getJournalWriteBatchSize() {
459            return transactionStore.getJournalMaxWriteBatchSize();
460        }
461    
462        @Override
463        public String toString() {
464            String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
465            return "MultiKahaDBPersistenceAdapter[" + path + "]" + adapters;
466        }
467    }