001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.Date;
022    import java.util.HashSet;
023    import java.util.Set;
024    import java.util.TreeSet;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.Future;
027    
028    import org.apache.activemq.broker.Broker;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.command.Message;
031    import org.apache.activemq.command.MessageAck;
032    import org.apache.activemq.command.MessageId;
033    import org.apache.activemq.command.TransactionId;
034    import org.apache.activemq.command.XATransactionId;
035    import org.apache.activemq.store.AbstractMessageStore;
036    import org.apache.activemq.store.MessageStore;
037    import org.apache.activemq.store.ProxyMessageStore;
038    import org.apache.activemq.store.ProxyTopicMessageStore;
039    import org.apache.activemq.store.TopicMessageStore;
040    import org.apache.activemq.store.TransactionRecoveryListener;
041    import org.apache.activemq.store.TransactionStore;
042    import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
043    import org.apache.activemq.store.kahadb.data.KahaEntryType;
044    import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
045    import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
046    import org.apache.activemq.util.IOHelper;
047    import org.apache.kahadb.journal.Journal;
048    import org.apache.kahadb.journal.Location;
049    import org.apache.kahadb.util.DataByteArrayInputStream;
050    import org.apache.kahadb.util.DataByteArrayOutputStream;
051    import org.slf4j.Logger;
052    import org.slf4j.LoggerFactory;
053    
054    public class MultiKahaDBTransactionStore implements TransactionStore {
055        static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
056        final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
057        final ConcurrentHashMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
058        final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
059        private Journal journal;
060        private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
061        private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
062    
063        public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
064            this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
065        }
066    
067        public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) {
068            return new ProxyMessageStore(messageStore) {
069                @Override
070                public void addMessage(ConnectionContext context, final Message send) throws IOException {
071                    MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
072                }
073    
074                @Override
075                public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
076                    MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
077                }
078    
079                @Override
080                public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
081                    return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
082                }
083    
084                @Override
085                public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
086                    return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
087                }
088    
089                @Override
090                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
091                    MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
092                }
093    
094                @Override
095                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
096                    MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
097                }
098            };
099        }
100    
101        public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) {
102            return new ProxyTopicMessageStore(messageStore) {
103                @Override
104                public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
105                    MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
106                }
107    
108                @Override
109                public void addMessage(ConnectionContext context, final Message send) throws IOException {
110                    MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
111                }
112    
113                @Override
114                public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
115                    return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
116                }
117    
118                @Override
119                public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
120                    return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
121                }
122    
123                @Override
124                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
125                    MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
126                }
127    
128                @Override
129                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
130                    MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
131                }
132    
133                @Override
134                public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
135                                        MessageId messageId, MessageAck ack) throws IOException {
136                    MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId,
137                            subscriptionName, messageId, ack);
138                }
139            };
140        }
141    
142        public void deleteAllMessages() {
143            IOHelper.deleteChildren(getDirectory());
144        }
145    
146        public int getJournalMaxFileLength() {
147            return journalMaxFileLength;
148        }
149    
150        public void setJournalMaxFileLength(int journalMaxFileLength) {
151            this.journalMaxFileLength = journalMaxFileLength;
152        }
153    
154        public int getJournalMaxWriteBatchSize() {
155            return journalWriteBatchSize;
156        }
157    
158        public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) {
159            this.journalWriteBatchSize = journalWriteBatchSize;
160        }
161    
162        public class Tx {
163            private final Set<TransactionStore> stores = new HashSet<TransactionStore>();
164            private int prepareLocationId = 0;
165    
166            public void trackStore(TransactionStore store) {
167                stores.add(store);
168            }
169    
170            public Set<TransactionStore> getStores() {
171                return stores;
172            }
173    
174            public void trackPrepareLocation(Location location) {
175                this.prepareLocationId = location.getDataFileId();
176            }
177    
178            public int getPreparedLocationId() {
179                return prepareLocationId;
180            }
181        }
182    
183        public Tx getTx(TransactionId txid) {
184            Tx tx = inflightTransactions.get(txid);
185            if (tx == null) {
186                tx = new Tx();
187                inflightTransactions.put(txid, tx);
188            }
189            return tx;
190        }
191    
192        public Tx removeTx(TransactionId txid) {
193            return inflightTransactions.remove(txid);
194        }
195    
196        public void prepare(TransactionId txid) throws IOException {
197            Tx tx = getTx(txid);
198            for (TransactionStore store : tx.getStores()) {
199                store.prepare(txid);
200            }
201        }
202    
203        public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
204                throws IOException {
205    
206            if (preCommit != null) {
207                preCommit.run();
208            }
209    
210            Tx tx = getTx(txid);
211            if (wasPrepared) {
212                for (TransactionStore store : tx.getStores()) {
213                    store.commit(txid, true, null, null);
214                }
215            } else {
216                // can only do 1pc on a single store
217                if (tx.getStores().size() == 1) {
218                    for (TransactionStore store : tx.getStores()) {
219                        store.commit(txid, false, null, null);
220                    }
221                } else {
222                    // need to do local 2pc
223                    for (TransactionStore store : tx.getStores()) {
224                        store.prepare(txid);
225                    }
226                    persistOutcome(tx, txid);
227                    for (TransactionStore store : tx.getStores()) {
228                        store.commit(txid, true, null, null);
229                    }
230                    persistCompletion(txid);
231                }
232            }
233            removeTx(txid);
234            if (postCommit != null) {
235                postCommit.run();
236            }
237        }
238    
239        public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
240            tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
241        }
242    
243        public void persistCompletion(TransactionId txid) throws IOException {
244            store(new KahaCommitCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)));
245        }
246    
247        private Location store(JournalCommand<?> data) throws IOException {
248            int size = data.serializedSizeFramed();
249            DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
250            os.writeByte(data.type().getNumber());
251            data.writeFramed(os);
252            Location location = journal.write(os.toByteSequence(), true);
253            journal.setLastAppendLocation(location);
254            return location;
255        }
256    
257        public void rollback(TransactionId txid) throws IOException {
258            Tx tx = removeTx(txid);
259            if (tx != null) {
260                for (TransactionStore store : tx.getStores()) {
261                    store.rollback(txid);
262                }
263            }
264        }
265    
266        public void start() throws Exception {
267            journal = new Journal() {
268                @Override
269                protected void cleanup() {
270                    super.cleanup();
271                    txStoreCleanup();
272                }
273            };
274            journal.setDirectory(getDirectory());
275            journal.setMaxFileLength(journalMaxFileLength);
276            journal.setWriteBatchSize(journalWriteBatchSize);
277            IOHelper.mkdirs(journal.getDirectory());
278            journal.start();
279            recoverPendingLocalTransactions();
280            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
281        }
282    
283        private void txStoreCleanup() {
284            Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
285            for (Tx tx : inflightTransactions.values()) {
286                knownDataFileIds.remove(tx.getPreparedLocationId());
287            }
288            try {
289                journal.removeDataFiles(knownDataFileIds);
290            } catch (Exception e) {
291                LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds);
292            }
293        }
294    
295        private File getDirectory() {
296            return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
297        }
298    
299        public void stop() throws Exception {
300            journal.close();
301            journal = null;
302        }
303    
304        private void recoverPendingLocalTransactions() throws IOException {
305            Location location = journal.getNextLocation(null);
306            while (location != null) {
307                process(load(location));
308                location = journal.getNextLocation(location);
309            }
310            recoveredPendingCommit.addAll(inflightTransactions.keySet());
311            LOG.info("pending local transactions: " + recoveredPendingCommit);
312        }
313    
314        public JournalCommand<?> load(Location location) throws IOException {
315            DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location));
316            byte readByte = is.readByte();
317            KahaEntryType type = KahaEntryType.valueOf(readByte);
318            if (type == null) {
319                throw new IOException("Could not load journal record. Invalid location: " + location);
320            }
321            JournalCommand<?> message = (JournalCommand<?>) type.createMessage();
322            message.mergeFramed(is);
323            return message;
324        }
325    
326        public void process(JournalCommand<?> command) throws IOException {
327            switch (command.type()) {
328                case KAHA_PREPARE_COMMAND:
329                    KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
330                    getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
331                    break;
332                case KAHA_COMMIT_COMMAND:
333                    KahaCommitCommand commitCommand = (KahaCommitCommand) command;
334                    removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo()));
335                    break;
336                case KAHA_TRACE_COMMAND:
337                    break;
338                default:
339                    throw new IOException("Unexpected command in transaction journal: " + command);
340            }
341        }
342    
343    
344        public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
345    
346            for (final KahaDBPersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
347                adapter.createTransactionStore().recover(new TransactionRecoveryListener() {
348                    @Override
349                    public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
350                        try {
351                            getTx(xid).trackStore(adapter.createTransactionStore());
352                        } catch (IOException e) {
353                            LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e);
354                        }
355                        listener.recover(xid, addedMessages, acks);
356                    }
357                });
358            }
359    
360            try {
361                Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
362                // force completion of local xa
363                for (TransactionId txid : broker.getPreparedTransactions(null)) {
364                    if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
365                        try {
366                            if (recoveredPendingCommit.contains(txid)) {
367                                LOG.info("delivering pending commit outcome for tid: " + txid);
368                                broker.commitTransaction(null, txid, false);
369    
370                            } else {
371                                LOG.info("delivering rollback outcome to store for tid: " + txid);
372                                broker.forgetTransaction(null, txid);
373                            }
374                            persistCompletion(txid);
375                        } catch (Exception ex) {
376                            LOG.error("failed to deliver pending outcome for tid: " + txid, ex);
377                        }
378                    }
379                }
380            } catch (Exception e) {
381                LOG.error("failed to resolve pending local transactions", e);
382            }
383        }
384    
385        void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
386                throws IOException {
387            if (message.getTransactionId() != null) {
388                getTx(message.getTransactionId()).trackStore(transactionStore);
389            }
390            destination.addMessage(context, message);
391        }
392    
393        Future<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
394                throws IOException {
395            if (message.getTransactionId() != null) {
396                getTx(message.getTransactionId()).trackStore(transactionStore);
397                destination.addMessage(context, message);
398                return AbstractMessageStore.FUTURE;
399            } else {
400                return destination.asyncAddQueueMessage(context, message);
401            }
402        }
403    
404        Future<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
405                throws IOException {
406    
407            if (message.getTransactionId() != null) {
408                getTx(message.getTransactionId()).trackStore(transactionStore);
409                destination.addMessage(context, message);
410                return AbstractMessageStore.FUTURE;
411            } else {
412                return destination.asyncAddTopicMessage(context, message);
413            }
414        }
415    
416        final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
417                throws IOException {
418            if (ack.getTransactionId() != null) {
419                getTx(ack.getTransactionId()).trackStore(transactionStore);
420            }
421            destination.removeMessage(context, ack);
422        }
423    
424        final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
425                throws IOException {
426            if (ack.getTransactionId() != null) {
427                getTx(ack.getTransactionId()).trackStore(transactionStore);
428            }
429            destination.removeAsyncMessage(context, ack);
430        }
431    
432        final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination,
433                               final String clientId, final String subscriptionName,
434                               final MessageId messageId, final MessageAck ack) throws IOException {
435            if (ack.getTransactionId() != null) {
436                getTx(ack.getTransactionId()).trackStore(transactionStore);
437            }
438            destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
439        }
440    }