001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.util.Date;
022    import java.util.HashSet;
023    import java.util.Set;
024    import java.util.TreeSet;
025    import java.util.concurrent.ConcurrentHashMap;
026    import java.util.concurrent.Future;
027    
028    import org.apache.activemq.broker.Broker;
029    import org.apache.activemq.broker.ConnectionContext;
030    import org.apache.activemq.command.Message;
031    import org.apache.activemq.command.MessageAck;
032    import org.apache.activemq.command.MessageId;
033    import org.apache.activemq.command.TransactionId;
034    import org.apache.activemq.command.XATransactionId;
035    import org.apache.activemq.store.*;
036    import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
037    import org.apache.activemq.store.kahadb.data.KahaEntryType;
038    import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
039    import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
040    import org.apache.activemq.util.IOHelper;
041    import org.apache.activemq.store.kahadb.disk.journal.Journal;
042    import org.apache.activemq.store.kahadb.disk.journal.Location;
043    import org.apache.activemq.util.DataByteArrayInputStream;
044    import org.apache.activemq.util.DataByteArrayOutputStream;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    public class MultiKahaDBTransactionStore implements TransactionStore {
049        static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
050        final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
051        final ConcurrentHashMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
052        final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
053        private Journal journal;
054        private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
055        private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
056    
057        public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
058            this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
059        }
060    
061        public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) {
062            return new ProxyMessageStore(messageStore) {
063                @Override
064                public void addMessage(ConnectionContext context, final Message send) throws IOException {
065                    MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
066                }
067    
068                @Override
069                public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
070                    MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
071                }
072    
073                @Override
074                public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
075                    return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
076                }
077    
078                @Override
079                public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
080                    return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message);
081                }
082    
083                @Override
084                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
085                    MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
086                }
087    
088                @Override
089                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
090                    MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
091                }
092            };
093        }
094    
095        public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) {
096            return new ProxyTopicMessageStore(messageStore) {
097                @Override
098                public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException {
099                    MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
100                }
101    
102                @Override
103                public void addMessage(ConnectionContext context, final Message send) throws IOException {
104                    MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send);
105                }
106    
107                @Override
108                public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException {
109                    return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
110                }
111    
112                @Override
113                public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
114                    return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message);
115                }
116    
117                @Override
118                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
119                    MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack);
120                }
121    
122                @Override
123                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
124                    MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack);
125                }
126    
127                @Override
128                public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
129                                        MessageId messageId, MessageAck ack) throws IOException {
130                    MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId,
131                            subscriptionName, messageId, ack);
132                }
133            };
134        }
135    
136        public void deleteAllMessages() {
137            IOHelper.deleteChildren(getDirectory());
138        }
139    
140        public int getJournalMaxFileLength() {
141            return journalMaxFileLength;
142        }
143    
144        public void setJournalMaxFileLength(int journalMaxFileLength) {
145            this.journalMaxFileLength = journalMaxFileLength;
146        }
147    
148        public int getJournalMaxWriteBatchSize() {
149            return journalWriteBatchSize;
150        }
151    
152        public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) {
153            this.journalWriteBatchSize = journalWriteBatchSize;
154        }
155    
156        public class Tx {
157            private final Set<TransactionStore> stores = new HashSet<TransactionStore>();
158            private int prepareLocationId = 0;
159    
160            public void trackStore(TransactionStore store) {
161                stores.add(store);
162            }
163    
164            public Set<TransactionStore> getStores() {
165                return stores;
166            }
167    
168            public void trackPrepareLocation(Location location) {
169                this.prepareLocationId = location.getDataFileId();
170            }
171    
172            public int getPreparedLocationId() {
173                return prepareLocationId;
174            }
175        }
176    
177        public Tx getTx(TransactionId txid) {
178            Tx tx = inflightTransactions.get(txid);
179            if (tx == null) {
180                tx = new Tx();
181                inflightTransactions.put(txid, tx);
182            }
183            return tx;
184        }
185    
186        public Tx removeTx(TransactionId txid) {
187            return inflightTransactions.remove(txid);
188        }
189    
190        public void prepare(TransactionId txid) throws IOException {
191            Tx tx = getTx(txid);
192            for (TransactionStore store : tx.getStores()) {
193                store.prepare(txid);
194            }
195        }
196    
197        public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
198                throws IOException {
199    
200            if (preCommit != null) {
201                preCommit.run();
202            }
203    
204            Tx tx = getTx(txid);
205            if (wasPrepared) {
206                for (TransactionStore store : tx.getStores()) {
207                    store.commit(txid, true, null, null);
208                }
209            } else {
210                // can only do 1pc on a single store
211                if (tx.getStores().size() == 1) {
212                    for (TransactionStore store : tx.getStores()) {
213                        store.commit(txid, false, null, null);
214                    }
215                } else {
216                    // need to do local 2pc
217                    for (TransactionStore store : tx.getStores()) {
218                        store.prepare(txid);
219                    }
220                    persistOutcome(tx, txid);
221                    for (TransactionStore store : tx.getStores()) {
222                        store.commit(txid, true, null, null);
223                    }
224                    persistCompletion(txid);
225                }
226            }
227            removeTx(txid);
228            if (postCommit != null) {
229                postCommit.run();
230            }
231        }
232    
233        public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
234            tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
235        }
236    
237        public void persistCompletion(TransactionId txid) throws IOException {
238            store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
239        }
240    
241        private Location store(JournalCommand<?> data) throws IOException {
242            int size = data.serializedSizeFramed();
243            DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
244            os.writeByte(data.type().getNumber());
245            data.writeFramed(os);
246            Location location = journal.write(os.toByteSequence(), true);
247            journal.setLastAppendLocation(location);
248            return location;
249        }
250    
251        public void rollback(TransactionId txid) throws IOException {
252            Tx tx = removeTx(txid);
253            if (tx != null) {
254                for (TransactionStore store : tx.getStores()) {
255                    store.rollback(txid);
256                }
257            }
258        }
259    
260        public void start() throws Exception {
261            journal = new Journal() {
262                @Override
263                protected void cleanup() {
264                    super.cleanup();
265                    txStoreCleanup();
266                }
267            };
268            journal.setDirectory(getDirectory());
269            journal.setMaxFileLength(journalMaxFileLength);
270            journal.setWriteBatchSize(journalWriteBatchSize);
271            IOHelper.mkdirs(journal.getDirectory());
272            journal.start();
273            recoverPendingLocalTransactions();
274            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
275        }
276    
277        private void txStoreCleanup() {
278            Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
279            for (Tx tx : inflightTransactions.values()) {
280                knownDataFileIds.remove(tx.getPreparedLocationId());
281            }
282            try {
283                journal.removeDataFiles(knownDataFileIds);
284            } catch (Exception e) {
285                LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds);
286            }
287        }
288    
289        private File getDirectory() {
290            return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
291        }
292    
293        public void stop() throws Exception {
294            journal.close();
295            journal = null;
296        }
297    
298        private void recoverPendingLocalTransactions() throws IOException {
299            Location location = journal.getNextLocation(null);
300            while (location != null) {
301                process(load(location));
302                location = journal.getNextLocation(location);
303            }
304            recoveredPendingCommit.addAll(inflightTransactions.keySet());
305            LOG.info("pending local transactions: " + recoveredPendingCommit);
306        }
307    
308        public JournalCommand<?> load(Location location) throws IOException {
309            DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location));
310            byte readByte = is.readByte();
311            KahaEntryType type = KahaEntryType.valueOf(readByte);
312            if (type == null) {
313                throw new IOException("Could not load journal record. Invalid location: " + location);
314            }
315            JournalCommand<?> message = (JournalCommand<?>) type.createMessage();
316            message.mergeFramed(is);
317            return message;
318        }
319    
320        public void process(JournalCommand<?> command) throws IOException {
321            switch (command.type()) {
322                case KAHA_PREPARE_COMMAND:
323                    KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
324                    getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
325                    break;
326                case KAHA_COMMIT_COMMAND:
327                    KahaCommitCommand commitCommand = (KahaCommitCommand) command;
328                    removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo()));
329                    break;
330                case KAHA_TRACE_COMMAND:
331                    break;
332                default:
333                    throw new IOException("Unexpected command in transaction journal: " + command);
334            }
335        }
336    
337    
338        public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
339    
340            for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
341                adapter.createTransactionStore().recover(new TransactionRecoveryListener() {
342                    @Override
343                    public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
344                        try {
345                            getTx(xid).trackStore(adapter.createTransactionStore());
346                        } catch (IOException e) {
347                            LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e);
348                        }
349                        listener.recover(xid, addedMessages, acks);
350                    }
351                });
352            }
353    
354            try {
355                Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker();
356                // force completion of local xa
357                for (TransactionId txid : broker.getPreparedTransactions(null)) {
358                    if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
359                        try {
360                            if (recoveredPendingCommit.contains(txid)) {
361                                LOG.info("delivering pending commit outcome for tid: " + txid);
362                                broker.commitTransaction(null, txid, false);
363    
364                            } else {
365                                LOG.info("delivering rollback outcome to store for tid: " + txid);
366                                broker.forgetTransaction(null, txid);
367                            }
368                            persistCompletion(txid);
369                        } catch (Exception ex) {
370                            LOG.error("failed to deliver pending outcome for tid: " + txid, ex);
371                        }
372                    }
373                }
374            } catch (Exception e) {
375                LOG.error("failed to resolve pending local transactions", e);
376            }
377        }
378    
379        void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
380                throws IOException {
381            if (message.getTransactionId() != null) {
382                getTx(message.getTransactionId()).trackStore(transactionStore);
383            }
384            destination.addMessage(context, message);
385        }
386    
387        Future<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
388                throws IOException {
389            if (message.getTransactionId() != null) {
390                getTx(message.getTransactionId()).trackStore(transactionStore);
391                destination.addMessage(context, message);
392                return AbstractMessageStore.FUTURE;
393            } else {
394                return destination.asyncAddQueueMessage(context, message);
395            }
396        }
397    
398        Future<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message)
399                throws IOException {
400    
401            if (message.getTransactionId() != null) {
402                getTx(message.getTransactionId()).trackStore(transactionStore);
403                destination.addMessage(context, message);
404                return AbstractMessageStore.FUTURE;
405            } else {
406                return destination.asyncAddTopicMessage(context, message);
407            }
408        }
409    
410        final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
411                throws IOException {
412            if (ack.getTransactionId() != null) {
413                getTx(ack.getTransactionId()).trackStore(transactionStore);
414            }
415            destination.removeMessage(context, ack);
416        }
417    
418        final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack)
419                throws IOException {
420            if (ack.getTransactionId() != null) {
421                getTx(ack.getTransactionId()).trackStore(transactionStore);
422            }
423            destination.removeAsyncMessage(context, ack);
424        }
425    
426        final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination,
427                               final String clientId, final String subscriptionName,
428                               final MessageId messageId, final MessageAck ack) throws IOException {
429            if (ack.getTransactionId() != null) {
430                getTx(ack.getTransactionId()).trackStore(transactionStore);
431            }
432            destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
433        }
434    }