001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.memory;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.Iterator;
023import java.util.LinkedHashMap;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027
028import org.apache.activemq.broker.ConnectionContext;
029import org.apache.activemq.command.Message;
030import org.apache.activemq.command.MessageAck;
031import org.apache.activemq.command.MessageId;
032import org.apache.activemq.command.TransactionId;
033import org.apache.activemq.command.XATransactionId;
034import org.apache.activemq.store.InlineListenableFuture;
035import org.apache.activemq.store.ListenableFuture;
036import org.apache.activemq.store.MessageStore;
037import org.apache.activemq.store.PersistenceAdapter;
038import org.apache.activemq.store.ProxyMessageStore;
039import org.apache.activemq.store.ProxyTopicMessageStore;
040import org.apache.activemq.store.TopicMessageStore;
041import org.apache.activemq.store.TransactionRecoveryListener;
042import org.apache.activemq.store.TransactionStore;
043
044/**
045 * Provides a TransactionStore implementation that can create transaction aware
046 * MessageStore objects from non transaction aware MessageStore objects.
047 */
048public class MemoryTransactionStore implements TransactionStore {
049
050    protected ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
051    protected Map<TransactionId, Tx> preparedTransactions = Collections.synchronizedMap(new LinkedHashMap<TransactionId, Tx>());
052    protected final PersistenceAdapter persistenceAdapter;
053
054    private boolean doingRecover;
055
056    public class Tx {
057
058        public ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
059
060        public final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
061
062        public void add(AddMessageCommand msg) {
063            messages.add(msg);
064        }
065
066        public void add(RemoveMessageCommand ack) {
067            acks.add(ack);
068        }
069
070        public Message[] getMessages() {
071            Message rc[] = new Message[messages.size()];
072            int count = 0;
073            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
074                AddMessageCommand cmd = iter.next();
075                rc[count++] = cmd.getMessage();
076            }
077            return rc;
078        }
079
080        public MessageAck[] getAcks() {
081            MessageAck rc[] = new MessageAck[acks.size()];
082            int count = 0;
083            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
084                RemoveMessageCommand cmd = iter.next();
085                rc[count++] = cmd.getMessageAck();
086            }
087            return rc;
088        }
089
090        /**
091         * @throws IOException
092         */
093        public void commit() throws IOException {
094            ConnectionContext ctx = new ConnectionContext();
095            persistenceAdapter.beginTransaction(ctx);
096            try {
097
098                // Do all the message adds.
099                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
100                    AddMessageCommand cmd = iter.next();
101                    cmd.run(ctx);
102                }
103                // And removes..
104                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
105                    RemoveMessageCommand cmd = iter.next();
106                    cmd.run(ctx);
107                }
108
109            } catch (IOException e) {
110                persistenceAdapter.rollbackTransaction(ctx);
111                throw e;
112            }
113            persistenceAdapter.commitTransaction(ctx);
114        }
115    }
116
117    public interface AddMessageCommand {
118        Message getMessage();
119
120        MessageStore getMessageStore();
121
122        void run(ConnectionContext context) throws IOException;
123
124        void setMessageStore(MessageStore messageStore);
125    }
126
127    public interface RemoveMessageCommand {
128        MessageAck getMessageAck();
129
130        void run(ConnectionContext context) throws IOException;
131
132        MessageStore getMessageStore();
133    }
134
135    public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
136        this.persistenceAdapter = persistenceAdapter;
137    }
138
139    public MessageStore proxy(MessageStore messageStore) {
140        ProxyMessageStore proxyMessageStore = new ProxyMessageStore(messageStore) {
141            @Override
142            public void addMessage(ConnectionContext context, final Message send) throws IOException {
143                MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
144            }
145
146            @Override
147            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
148                MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
149            }
150
151            @Override
152            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
153                MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
154                return new InlineListenableFuture();
155            }
156
157            @Override
158            public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
159                MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
160                return new InlineListenableFuture();
161            }
162
163            @Override
164            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
165                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
166            }
167
168            @Override
169            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
170                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
171            }
172        };
173        onProxyQueueStore(proxyMessageStore);
174        return proxyMessageStore;
175    }
176
177    protected void onProxyQueueStore(ProxyMessageStore proxyMessageStore) {
178    }
179
180    public TopicMessageStore proxy(TopicMessageStore messageStore) {
181        ProxyTopicMessageStore proxyTopicMessageStore = new ProxyTopicMessageStore(messageStore) {
182            @Override
183            public void addMessage(ConnectionContext context, final Message send) throws IOException {
184                MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
185            }
186
187            @Override
188            public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
189                MemoryTransactionStore.this.addMessage(context, getDelegate(), send);
190            }
191
192            @Override
193            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
194                MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
195                return new InlineListenableFuture();
196            }
197
198            @Override
199            public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
200                MemoryTransactionStore.this.addMessage(context, getDelegate(), message);
201                return new InlineListenableFuture();
202            }
203
204            @Override
205            public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
206                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
207            }
208
209            @Override
210            public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
211                MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
212            }
213
214            @Override
215            public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack)
216                throws IOException {
217                MemoryTransactionStore.this.acknowledge((TopicMessageStore) getDelegate(), clientId, subscriptionName, messageId, ack);
218            }
219        };
220        onProxyTopicStore(proxyTopicMessageStore);
221        return proxyTopicMessageStore;
222    }
223
224    protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
225    }
226
227    /**
228     * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
229     */
230    @Override
231    public void prepare(TransactionId txid) throws IOException {
232        Tx tx = inflightTransactions.remove(txid);
233        if (tx == null) {
234            return;
235        }
236        preparedTransactions.put(txid, tx);
237    }
238
239    public Tx getTx(Object txid) {
240        Tx tx = inflightTransactions.get(txid);
241        if (tx == null) {
242            tx = new Tx();
243            inflightTransactions.put(txid, tx);
244        }
245        return tx;
246    }
247
248    public Tx getPreparedTx(TransactionId txid) {
249        Tx tx = preparedTransactions.get(txid);
250        if (tx == null) {
251            tx = new Tx();
252            preparedTransactions.put(txid, tx);
253        }
254        return tx;
255    }
256
257    @Override
258    public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) throws IOException {
259        if (preCommit != null) {
260            preCommit.run();
261        }
262        Tx tx;
263        if (wasPrepared) {
264            tx = preparedTransactions.remove(txid);
265        } else {
266            tx = inflightTransactions.remove(txid);
267        }
268
269        if (tx != null) {
270            tx.commit();
271        }
272        if (postCommit != null) {
273            postCommit.run();
274        }
275    }
276
277    /**
278     * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
279     */
280    @Override
281    public void rollback(TransactionId txid) throws IOException {
282        preparedTransactions.remove(txid);
283        inflightTransactions.remove(txid);
284    }
285
286    @Override
287    public void start() throws Exception {
288    }
289
290    @Override
291    public void stop() throws Exception {
292    }
293
294    @Override
295    public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
296        // All the inflight transactions get rolled back..
297        inflightTransactions.clear();
298        this.doingRecover = true;
299        try {
300            for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
301                Object txid = iter.next();
302                Tx tx = preparedTransactions.get(txid);
303                listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
304                onRecovered(tx);
305            }
306        } finally {
307            this.doingRecover = false;
308        }
309    }
310
311    protected void onRecovered(Tx tx) {
312    }
313
314    /**
315     * @param message
316     * @throws IOException
317     */
318    void addMessage(final ConnectionContext context, final MessageStore destination, final Message message) throws IOException {
319
320        if (doingRecover) {
321            return;
322        }
323
324        if (message.getTransactionId() != null) {
325            Tx tx = getTx(message.getTransactionId());
326            tx.add(new AddMessageCommand() {
327                @SuppressWarnings("unused")
328                MessageStore messageStore = destination;
329
330                @Override
331                public Message getMessage() {
332                    return message;
333                }
334
335                @Override
336                public MessageStore getMessageStore() {
337                    return destination;
338                }
339
340                @Override
341                public void run(ConnectionContext ctx) throws IOException {
342                    destination.addMessage(ctx, message);
343                }
344
345                @Override
346                public void setMessageStore(MessageStore messageStore) {
347                    this.messageStore = messageStore;
348                }
349
350            });
351        } else {
352            destination.addMessage(context, message);
353        }
354    }
355
356    /**
357     * @param ack
358     * @throws IOException
359     */
360    final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
361        if (doingRecover) {
362            return;
363        }
364
365        if (ack.isInTransaction()) {
366            Tx tx = getTx(ack.getTransactionId());
367            tx.add(new RemoveMessageCommand() {
368                @Override
369                public MessageAck getMessageAck() {
370                    return ack;
371                }
372
373                @Override
374                public void run(ConnectionContext ctx) throws IOException {
375                    destination.removeMessage(ctx, ack);
376                }
377
378                @Override
379                public MessageStore getMessageStore() {
380                    return destination;
381                }
382            });
383        } else {
384            destination.removeMessage(null, ack);
385        }
386    }
387
388    public void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName, final MessageId messageId,
389        final MessageAck ack) throws IOException {
390        if (doingRecover) {
391            return;
392        }
393
394        if (ack.isInTransaction()) {
395            Tx tx = getTx(ack.getTransactionId());
396            tx.add(new RemoveMessageCommand() {
397                @Override
398                public MessageAck getMessageAck() {
399                    return ack;
400                }
401
402                @Override
403                public void run(ConnectionContext ctx) throws IOException {
404                    destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
405                }
406
407                @Override
408                public MessageStore getMessageStore() {
409                    return destination;
410                }
411            });
412        } else {
413            destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
414        }
415    }
416
417    public void delete() {
418        inflightTransactions.clear();
419        preparedTransactions.clear();
420        doingRecover = false;
421    }
422}