001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.memory;
018    
019    import org.apache.activemq.broker.ConnectionContext;
020    import org.apache.activemq.command.Message;
021    import org.apache.activemq.command.MessageAck;
022    import org.apache.activemq.command.MessageId;
023    import org.apache.activemq.command.TransactionId;
024    import org.apache.activemq.command.XATransactionId;
025    import org.apache.activemq.store.AbstractMessageStore;
026    import org.apache.activemq.store.MessageStore;
027    import org.apache.activemq.store.PersistenceAdapter;
028    import org.apache.activemq.store.ProxyMessageStore;
029    import org.apache.activemq.store.ProxyTopicMessageStore;
030    import org.apache.activemq.store.TopicMessageStore;
031    import org.apache.activemq.store.TransactionRecoveryListener;
032    import org.apache.activemq.store.TransactionStore;
033    
034    import java.io.IOException;
035    import java.util.ArrayList;
036    import java.util.Iterator;
037    import java.util.concurrent.ConcurrentHashMap;
038    import java.util.concurrent.Future;
039    
040    /**
041     * Provides a TransactionStore implementation that can create transaction aware
042     * MessageStore objects from non transaction aware MessageStore objects.
043     *
044     *
045     */
046    public class MemoryTransactionStore implements TransactionStore {
047    
048        protected ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
049        protected ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
050        protected final PersistenceAdapter persistenceAdapter;
051    
052        private boolean doingRecover;
053    
054        public class Tx {
055            public ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
056    
057            public final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
058    
059            public void add(AddMessageCommand msg) {
060                messages.add(msg);
061            }
062    
063            public void add(RemoveMessageCommand ack) {
064                acks.add(ack);
065            }
066    
067            public Message[] getMessages() {
068                Message rc[] = new Message[messages.size()];
069                int count = 0;
070                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
071                    AddMessageCommand cmd = iter.next();
072                    rc[count++] = cmd.getMessage();
073                }
074                return rc;
075            }
076    
077            public MessageAck[] getAcks() {
078                MessageAck rc[] = new MessageAck[acks.size()];
079                int count = 0;
080                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
081                    RemoveMessageCommand cmd = iter.next();
082                    rc[count++] = cmd.getMessageAck();
083                }
084                return rc;
085            }
086    
087            /**
088             * @throws IOException
089             */
090            public void commit() throws IOException {
091                ConnectionContext ctx = new ConnectionContext();
092                persistenceAdapter.beginTransaction(ctx);
093                try {
094    
095                    // Do all the message adds.
096                    for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
097                        AddMessageCommand cmd = iter.next();
098                        cmd.run(ctx);
099                    }
100                    // And removes..
101                    for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
102                        RemoveMessageCommand cmd = iter.next();
103                        cmd.run(ctx);
104                    }
105    
106                } catch ( IOException e ) {
107                    persistenceAdapter.rollbackTransaction(ctx);
108                    throw e;
109                }
110                persistenceAdapter.commitTransaction(ctx);
111            }
112        }
113    
114        public interface AddMessageCommand {
115            Message getMessage();
116    
117            MessageStore getMessageStore();
118    
119            void run(ConnectionContext context) throws IOException;
120        }
121    
122        public interface RemoveMessageCommand {
123            MessageAck getMessageAck();
124    
125            void run(ConnectionContext context) throws IOException;
126    
127            MessageStore getMessageStore();
128        }
129    
130        public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
131            this.persistenceAdapter=persistenceAdapter;
132        }
133    
134        public MessageStore proxy(MessageStore messageStore) {
135            return new ProxyMessageStore(messageStore) {
136                @Override
137                public void addMessage(ConnectionContext context, final Message send) throws IOException {
138                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
139                }
140    
141                @Override
142                public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
143                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
144                }
145    
146                @Override
147                public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
148                    MemoryTransactionStore.this.addMessage(getDelegate(), message);
149                    return AbstractMessageStore.FUTURE;
150                 }
151    
152                @Override
153                public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
154                    MemoryTransactionStore.this.addMessage(getDelegate(), message);
155                    return AbstractMessageStore.FUTURE;
156                 }
157    
158                @Override
159                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
160                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
161                }
162    
163                @Override
164                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
165                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
166                }
167            };
168        }
169    
170        public TopicMessageStore proxy(TopicMessageStore messageStore) {
171            ProxyTopicMessageStore proxyTopicMessageStore = new ProxyTopicMessageStore(messageStore) {
172                @Override
173                public void addMessage(ConnectionContext context, final Message send) throws IOException {
174                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
175                }
176    
177                @Override
178                public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
179                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
180                }
181    
182                @Override
183                public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
184                    MemoryTransactionStore.this.addMessage(getDelegate(), message);
185                    return AbstractMessageStore.FUTURE;
186                 }
187    
188                @Override
189                public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
190                    MemoryTransactionStore.this.addMessage(getDelegate(), message);
191                    return AbstractMessageStore.FUTURE;
192                 }
193    
194                @Override
195                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
196                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
197                }
198    
199                @Override
200                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
201                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
202                }
203    
204                @Override
205                public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
206                                MessageId messageId, MessageAck ack) throws IOException {
207                    MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId,
208                            subscriptionName, messageId, ack);
209                }
210            };
211            onProxyTopicStore(proxyTopicMessageStore);
212            return proxyTopicMessageStore;
213        }
214    
215        protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
216        }
217    
218        /**
219         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
220         */
221        public void prepare(TransactionId txid) throws IOException {
222            Tx tx = inflightTransactions.remove(txid);
223            if (tx == null) {
224                return;
225            }
226            preparedTransactions.put(txid, tx);
227        }
228    
229        public Tx getTx(Object txid) {
230            Tx tx = inflightTransactions.get(txid);
231            if (tx == null) {
232                tx = new Tx();
233                inflightTransactions.put(txid, tx);
234            }
235            return tx;
236        }
237    
238        public Tx getPreparedTx(TransactionId txid) {
239            Tx tx = preparedTransactions.get(txid);
240            if (tx == null) {
241                tx = new Tx();
242                preparedTransactions.put(txid, tx);
243            }
244            return tx;
245        }
246    
247        public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
248            if (preCommit != null) {
249                preCommit.run();
250            }
251            Tx tx;
252            if (wasPrepared) {
253                tx = preparedTransactions.remove(txid);
254            } else {
255                tx = inflightTransactions.remove(txid);
256            }
257    
258            if (tx != null) {
259                tx.commit();
260            }
261            if (postCommit != null) {
262                postCommit.run();
263            }
264        }
265    
266        /**
267         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
268         */
269        public void rollback(TransactionId txid) throws IOException {
270            preparedTransactions.remove(txid);
271            inflightTransactions.remove(txid);
272        }
273    
274        public void start() throws Exception {
275        }
276    
277        public void stop() throws Exception {
278        }
279    
280        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
281            // All the inflight transactions get rolled back..
282            inflightTransactions.clear();
283            this.doingRecover = true;
284            try {
285                for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
286                    Object txid = iter.next();
287                    Tx tx = preparedTransactions.get(txid);
288                    listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
289                    onRecovered(tx);
290                }
291            } finally {
292                this.doingRecover = false;
293            }
294        }
295    
296        protected void onRecovered(Tx tx) {
297        }
298    
299        /**
300         * @param message
301         * @throws IOException
302         */
303        void addMessage(final MessageStore destination, final Message message) throws IOException {
304    
305            if (doingRecover) {
306                return;
307            }
308    
309            if (message.getTransactionId() != null) {
310                Tx tx = getTx(message.getTransactionId());
311                tx.add(new AddMessageCommand() {
312                    public Message getMessage() {
313                        return message;
314                    }
315    
316                    @Override
317                    public MessageStore getMessageStore() {
318                        return destination;
319                    }
320    
321                    public void run(ConnectionContext ctx) throws IOException {
322                        destination.addMessage(ctx, message);
323                    }
324    
325                });
326            } else {
327                destination.addMessage(null, message);
328            }
329        }
330    
331        /**
332         * @param ack
333         * @throws IOException
334         */
335        final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
336            if (doingRecover) {
337                return;
338            }
339    
340            if (ack.isInTransaction()) {
341                Tx tx = getTx(ack.getTransactionId());
342                tx.add(new RemoveMessageCommand() {
343                    public MessageAck getMessageAck() {
344                        return ack;
345                    }
346    
347                    public void run(ConnectionContext ctx) throws IOException {
348                        destination.removeMessage(ctx, ack);
349                    }
350    
351                    @Override
352                    public MessageStore getMessageStore() {
353                        return destination;
354                    }
355                });
356            } else {
357                destination.removeMessage(null, ack);
358            }
359        }
360    
361        public void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
362                               final MessageId messageId, final MessageAck ack) throws IOException {
363            if (doingRecover) {
364                return;
365            }
366    
367            if (ack.isInTransaction()) {
368                Tx tx = getTx(ack.getTransactionId());
369                tx.add(new RemoveMessageCommand() {
370                    public MessageAck getMessageAck() {
371                        return ack;
372                    }
373    
374                    public void run(ConnectionContext ctx) throws IOException {
375                        destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
376                    }
377    
378                    @Override
379                    public MessageStore getMessageStore() {
380                        return destination;
381                    }
382                });
383            } else {
384                destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
385            }
386        }
387    
388    
389        public void delete() {
390            inflightTransactions.clear();
391            preparedTransactions.clear();
392            doingRecover = false;
393        }
394    
395    }