001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadb;
018    
019    import java.io.DataInputStream;
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.concurrent.CancellationException;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.ExecutionException;
028    import java.util.concurrent.Future;
029    
030    import org.apache.activemq.broker.ConnectionContext;
031    import org.apache.activemq.command.Message;
032    import org.apache.activemq.command.MessageAck;
033    import org.apache.activemq.command.MessageId;
034    import org.apache.activemq.command.TransactionId;
035    import org.apache.activemq.command.XATransactionId;
036    import org.apache.activemq.openwire.OpenWireFormat;
037    import org.apache.activemq.protobuf.Buffer;
038    import org.apache.activemq.store.AbstractMessageStore;
039    import org.apache.activemq.store.MessageStore;
040    import org.apache.activemq.store.ProxyMessageStore;
041    import org.apache.activemq.store.ProxyTopicMessageStore;
042    import org.apache.activemq.store.TopicMessageStore;
043    import org.apache.activemq.store.TransactionRecoveryListener;
044    import org.apache.activemq.store.TransactionStore;
045    import org.apache.activemq.store.kahadb.MessageDatabase.AddOpperation;
046    import org.apache.activemq.store.kahadb.MessageDatabase.Operation;
047    import org.apache.activemq.store.kahadb.MessageDatabase.RemoveOpperation;
048    import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
049    import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
050    import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
051    import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
052    import org.apache.activemq.wireformat.WireFormat;
053    import org.slf4j.Logger;
054    import org.slf4j.LoggerFactory;
055    
056    /**
057     * Provides a TransactionStore implementation that can create transaction aware
058     * MessageStore objects from non transaction aware MessageStore objects.
059     *
060     *
061     */
062    public class KahaDBTransactionStore implements TransactionStore {
063        static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
064        ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
065        private final WireFormat wireFormat = new OpenWireFormat();
066        private final KahaDBStore theStore;
067    
068        public KahaDBTransactionStore(KahaDBStore theStore) {
069            this.theStore = theStore;
070        }
071    
072        public class Tx {
073            private final ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
074    
075            private final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
076    
077            public void add(AddMessageCommand msg) {
078                messages.add(msg);
079            }
080    
081            public void add(RemoveMessageCommand ack) {
082                acks.add(ack);
083            }
084    
085            public Message[] getMessages() {
086                Message rc[] = new Message[messages.size()];
087                int count = 0;
088                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
089                    AddMessageCommand cmd = iter.next();
090                    rc[count++] = cmd.getMessage();
091                }
092                return rc;
093            }
094    
095            public MessageAck[] getAcks() {
096                MessageAck rc[] = new MessageAck[acks.size()];
097                int count = 0;
098                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
099                    RemoveMessageCommand cmd = iter.next();
100                    rc[count++] = cmd.getMessageAck();
101                }
102                return rc;
103            }
104    
105            /**
106             * @return true if something to commit
107             * @throws IOException
108             */
109            public List<Future<Object>> commit() throws IOException {
110                List<Future<Object>> results = new ArrayList<Future<Object>>();
111                // Do all the message adds.
112                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
113                    AddMessageCommand cmd = iter.next();
114                    results.add(cmd.run());
115    
116                }
117                // And removes..
118                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
119                    RemoveMessageCommand cmd = iter.next();
120                    cmd.run();
121                    results.add(cmd.run());
122                }
123    
124                return results;
125            }
126        }
127    
128        public abstract class AddMessageCommand {
129            private final ConnectionContext ctx;
130            AddMessageCommand(ConnectionContext ctx) {
131                this.ctx = ctx;
132            }
133            abstract Message getMessage();
134            Future<Object> run() throws IOException {
135                return run(this.ctx);
136            }
137            abstract Future<Object> run(ConnectionContext ctx) throws IOException;
138        }
139    
140        public abstract class RemoveMessageCommand {
141    
142            private final ConnectionContext ctx;
143            RemoveMessageCommand(ConnectionContext ctx) {
144                this.ctx = ctx;
145            }
146            abstract MessageAck getMessageAck();
147            Future<Object> run() throws IOException {
148                return run(this.ctx);
149            }
150            abstract Future<Object> run(ConnectionContext context) throws IOException;
151        }
152    
153        public MessageStore proxy(MessageStore messageStore) {
154            return new ProxyMessageStore(messageStore) {
155                @Override
156                public void addMessage(ConnectionContext context, final Message send) throws IOException {
157                    KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
158                }
159    
160                @Override
161                public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
162                    KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
163                }
164    
165                @Override
166                public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
167                    return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
168                }
169    
170                @Override
171                public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
172                    return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message);
173                }
174    
175                @Override
176                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
177                    KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
178                }
179    
180                @Override
181                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
182                    KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
183                }
184            };
185        }
186    
187        public TopicMessageStore proxy(TopicMessageStore messageStore) {
188            return new ProxyTopicMessageStore(messageStore) {
189                @Override
190                public void addMessage(ConnectionContext context, final Message send) throws IOException {
191                    KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
192                }
193    
194                @Override
195                public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
196                    KahaDBTransactionStore.this.addMessage(context, getDelegate(), send);
197                }
198    
199                @Override
200                public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
201                    return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
202                }
203    
204                @Override
205                public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
206                    return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message);
207                }
208    
209                @Override
210                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
211                    KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack);
212                }
213    
214                @Override
215                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
216                    KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack);
217                }
218    
219                @Override
220                public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
221                                MessageId messageId, MessageAck ack) throws IOException {
222                    KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId,
223                            subscriptionName, messageId, ack);
224                }
225    
226            };
227        }
228    
229        /**
230         * @throws IOException
231         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
232         */
233        public void prepare(TransactionId txid) throws IOException {
234            KahaTransactionInfo info = getTransactionInfo(txid);
235            if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
236                theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
237            } else {
238                Tx tx = inflightTransactions.remove(txid);
239                if (tx != null) {
240                   theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
241                }
242            }
243        }
244    
245        public Tx getTx(Object txid) {
246            Tx tx = inflightTransactions.get(txid);
247            if (tx == null) {
248                tx = new Tx();
249                inflightTransactions.put(txid, tx);
250            }
251            return tx;
252        }
253    
254        public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
255                throws IOException {
256            if (txid != null) {
257                if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
258                    if (preCommit != null) {
259                        preCommit.run();
260                    }
261                    Tx tx = inflightTransactions.remove(txid);
262                    if (tx != null) {
263                        List<Future<Object>> results = tx.commit();
264                        boolean doneSomething = false;
265                        for (Future<Object> result : results) {
266                            try {
267                                result.get();
268                            } catch (InterruptedException e) {
269                                theStore.brokerService.handleIOException(new IOException(e.getMessage()));
270                            } catch (ExecutionException e) {
271                                theStore.brokerService.handleIOException(new IOException(e.getMessage()));
272                            }catch(CancellationException e) {
273                            }
274                            if (!result.isCancelled()) {
275                                doneSomething = true;
276                            }
277                        }
278                        if (postCommit != null) {
279                            postCommit.run();
280                        }
281                        if (doneSomething) {
282                            KahaTransactionInfo info = getTransactionInfo(txid);
283                            theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, null, null);
284                        }
285                    }else {
286                        //The Tx will be null for failed over clients - lets run their post commits
287                        if (postCommit != null) {
288                            postCommit.run();
289                        }
290                    }
291    
292                } else {
293                    KahaTransactionInfo info = getTransactionInfo(txid);
294                    theStore.store(new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
295                    forgetRecoveredAcks(txid);
296                }
297            }else {
298               LOG.error("Null transaction passed on commit");
299            }
300        }
301    
302        /**
303         * @throws IOException
304         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
305         */
306        public void rollback(TransactionId txid) throws IOException {
307            if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
308                KahaTransactionInfo info = getTransactionInfo(txid);
309                theStore.store(new KahaRollbackCommand().setTransactionInfo(info), false, null, null);
310                forgetRecoveredAcks(txid);
311            } else {
312                inflightTransactions.remove(txid);
313            }
314        }
315    
316        protected void forgetRecoveredAcks(TransactionId txid) throws IOException {
317            if (txid.isXATransaction()) {
318                XATransactionId xaTid = ((XATransactionId) txid);
319                theStore.forgetRecoveredAcks(xaTid.getPreparedAcks());
320            }
321        }
322    
323        public void start() throws Exception {
324        }
325    
326        public void stop() throws Exception {
327        }
328    
329        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
330            for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
331                XATransactionId xid = (XATransactionId) entry.getKey();
332                ArrayList<Message> messageList = new ArrayList<Message>();
333                ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
334    
335                for (Operation op : entry.getValue()) {
336                    if (op.getClass() == AddOpperation.class) {
337                        AddOpperation addOp = (AddOpperation) op;
338                        Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addOp.getCommand().getMessage()
339                                .newInput()));
340                        messageList.add(msg);
341                    } else {
342                        RemoveOpperation rmOp = (RemoveOpperation) op;
343                        Buffer ackb = rmOp.getCommand().getAck();
344                        MessageAck ack = (MessageAck) wireFormat.unmarshal(new DataInputStream(ackb.newInput()));
345                        ackList.add(ack);
346                    }
347                }
348    
349                Message[] addedMessages = new Message[messageList.size()];
350                MessageAck[] acks = new MessageAck[ackList.size()];
351                messageList.toArray(addedMessages);
352                ackList.toArray(acks);
353                xid.setPreparedAcks(ackList);
354                theStore.trackRecoveredAcks(ackList);
355                listener.recover(xid, addedMessages, acks);
356            }
357        }
358    
359        /**
360         * @param message
361         * @throws IOException
362         */
363        void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
364                throws IOException {
365    
366            if (message.getTransactionId() != null) {
367                if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
368                    destination.addMessage(context, message);
369                } else {
370                    Tx tx = getTx(message.getTransactionId());
371                    tx.add(new AddMessageCommand(context) {
372                        @Override
373                        public Message getMessage() {
374                            return message;
375                        }
376                        @Override
377                        public Future<Object> run(ConnectionContext ctx) throws IOException {
378                            destination.addMessage(ctx, message);
379                            return AbstractMessageStore.FUTURE;
380                        }
381    
382                    });
383                }
384            } else {
385                destination.addMessage(context, message);
386            }
387        }
388    
389        Future<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
390                throws IOException {
391    
392            if (message.getTransactionId() != null) {
393                if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
394                    destination.addMessage(context, message);
395                    return AbstractMessageStore.FUTURE;
396                } else {
397                    Tx tx = getTx(message.getTransactionId());
398                    tx.add(new AddMessageCommand(context) {
399                        @Override
400                        public Message getMessage() {
401                            return message;
402                        }
403                        @Override
404                        public Future<Object> run(ConnectionContext ctx) throws IOException {
405                            return destination.asyncAddQueueMessage(ctx, message);
406                        }
407    
408                    });
409                    return AbstractMessageStore.FUTURE;
410                }
411            } else {
412                return destination.asyncAddQueueMessage(context, message);
413            }
414        }
415    
416        Future<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message)
417                throws IOException {
418    
419            if (message.getTransactionId() != null) {
420                if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
421                    destination.addMessage(context, message);
422                    return AbstractMessageStore.FUTURE;
423                } else {
424                    Tx tx = getTx(message.getTransactionId());
425                    tx.add(new AddMessageCommand(context) {
426                        @Override
427                        public Message getMessage() {
428                            return message;
429                        }
430                        @Override
431                        public Future<Object> run(ConnectionContext ctx) throws IOException {
432                            return destination.asyncAddTopicMessage(ctx, message);
433                        }
434    
435                    });
436                    return AbstractMessageStore.FUTURE;
437                }
438            } else {
439                return destination.asyncAddTopicMessage(context, message);
440            }
441        }
442    
443        /**
444         * @param ack
445         * @throws IOException
446         */
447        final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
448                throws IOException {
449    
450            if (ack.isInTransaction()) {
451                if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
452                    destination.removeMessage(context, ack);
453                } else {
454                    Tx tx = getTx(ack.getTransactionId());
455                    tx.add(new RemoveMessageCommand(context) {
456                        @Override
457                        public MessageAck getMessageAck() {
458                            return ack;
459                        }
460    
461                        @Override
462                        public Future<Object> run(ConnectionContext ctx) throws IOException {
463                            destination.removeMessage(ctx, ack);
464                            return AbstractMessageStore.FUTURE;
465                        }
466                    });
467                }
468            } else {
469                destination.removeMessage(context, ack);
470            }
471        }
472    
473        final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
474                throws IOException {
475    
476            if (ack.isInTransaction()) {
477                if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
478                    destination.removeAsyncMessage(context, ack);
479                } else {
480                    Tx tx = getTx(ack.getTransactionId());
481                    tx.add(new RemoveMessageCommand(context) {
482                        @Override
483                        public MessageAck getMessageAck() {
484                            return ack;
485                        }
486    
487                        @Override
488                        public Future<Object> run(ConnectionContext ctx) throws IOException {
489                            destination.removeMessage(ctx, ack);
490                            return AbstractMessageStore.FUTURE;
491                        }
492                    });
493                }
494            } else {
495                destination.removeAsyncMessage(context, ack);
496            }
497        }
498    
499        final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName,
500                               final MessageId messageId, final MessageAck ack) throws IOException {
501    
502            if (ack.isInTransaction()) {
503                if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
504                    destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
505                } else {
506                    Tx tx = getTx(ack.getTransactionId());
507                    tx.add(new RemoveMessageCommand(context) {
508                        public MessageAck getMessageAck() {
509                            return ack;
510                        }
511    
512                        public Future<Object> run(ConnectionContext ctx) throws IOException {
513                            destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
514                            return AbstractMessageStore.FUTURE;
515                        }
516                    });
517                }
518            } else {
519                destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
520            }
521        }
522    
523    
524        private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
525            return theStore.getTransactionIdTransformer().transform(txid);
526        }
527    }