001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.memory;
018    
019    import org.apache.activemq.broker.ConnectionContext;
020    import org.apache.activemq.command.Message;
021    import org.apache.activemq.command.MessageAck;
022    import org.apache.activemq.command.MessageId;
023    import org.apache.activemq.command.TransactionId;
024    import org.apache.activemq.command.XATransactionId;
025    import org.apache.activemq.store.AbstractMessageStore;
026    import org.apache.activemq.store.MessageStore;
027    import org.apache.activemq.store.PersistenceAdapter;
028    import org.apache.activemq.store.ProxyMessageStore;
029    import org.apache.activemq.store.ProxyTopicMessageStore;
030    import org.apache.activemq.store.TopicMessageStore;
031    import org.apache.activemq.store.TransactionRecoveryListener;
032    import org.apache.activemq.store.TransactionStore;
033    import org.apache.activemq.store.jdbc.JDBCMessageStore;
034    
035    import java.io.IOException;
036    import java.util.ArrayList;
037    import java.util.Iterator;
038    import java.util.concurrent.ConcurrentHashMap;
039    import java.util.concurrent.Future;
040    
041    /**
042     * Provides a TransactionStore implementation that can create transaction aware
043     * MessageStore objects from non transaction aware MessageStore objects.
044     *
045     *
046     */
047    public class MemoryTransactionStore implements TransactionStore {
048    
049        protected ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
050        protected ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
051        protected final PersistenceAdapter persistenceAdapter;
052    
053        private boolean doingRecover;
054    
055        public class Tx {
056            public ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
057    
058            public final ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
059    
060            public void add(AddMessageCommand msg) {
061                messages.add(msg);
062            }
063    
064            public void add(RemoveMessageCommand ack) {
065                acks.add(ack);
066            }
067    
068            public Message[] getMessages() {
069                Message rc[] = new Message[messages.size()];
070                int count = 0;
071                for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
072                    AddMessageCommand cmd = iter.next();
073                    rc[count++] = cmd.getMessage();
074                }
075                return rc;
076            }
077    
078            public MessageAck[] getAcks() {
079                MessageAck rc[] = new MessageAck[acks.size()];
080                int count = 0;
081                for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
082                    RemoveMessageCommand cmd = iter.next();
083                    rc[count++] = cmd.getMessageAck();
084                }
085                return rc;
086            }
087    
088            /**
089             * @throws IOException
090             */
091            public void commit() throws IOException {
092                ConnectionContext ctx = new ConnectionContext();
093                persistenceAdapter.beginTransaction(ctx);
094                try {
095    
096                    // Do all the message adds.
097                    for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
098                        AddMessageCommand cmd = iter.next();
099                        cmd.run(ctx);
100                    }
101                    // And removes..
102                    for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
103                        RemoveMessageCommand cmd = iter.next();
104                        cmd.run(ctx);
105                    }
106    
107                } catch ( IOException e ) {
108                    persistenceAdapter.rollbackTransaction(ctx);
109                    throw e;
110                }
111                persistenceAdapter.commitTransaction(ctx);
112            }
113        }
114    
115        public interface AddMessageCommand {
116            Message getMessage();
117    
118            MessageStore getMessageStore();
119    
120            void run(ConnectionContext context) throws IOException;
121        }
122    
123        public interface RemoveMessageCommand {
124            MessageAck getMessageAck();
125    
126            void run(ConnectionContext context) throws IOException;
127    
128            MessageStore getMessageStore();
129        }
130    
131        public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
132            this.persistenceAdapter=persistenceAdapter;
133        }
134    
135        public MessageStore proxy(MessageStore messageStore) {
136            return new ProxyMessageStore(messageStore) {
137                @Override
138                public void addMessage(ConnectionContext context, final Message send) throws IOException {
139                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
140                }
141    
142                @Override
143                public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
144                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
145                }
146    
147                @Override
148                public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException {
149                    MemoryTransactionStore.this.addMessage(getDelegate(), message);
150                    return AbstractMessageStore.FUTURE;
151                 }
152    
153                @Override
154                public Future<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException {
155                    MemoryTransactionStore.this.addMessage(getDelegate(), message);
156                    return AbstractMessageStore.FUTURE;
157                 }
158    
159                @Override
160                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
161                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
162                }
163    
164                @Override
165                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
166                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
167                }
168            };
169        }
170    
171        public TopicMessageStore proxy(TopicMessageStore messageStore) {
172            ProxyTopicMessageStore proxyTopicMessageStore = new ProxyTopicMessageStore(messageStore) {
173                @Override
174                public void addMessage(ConnectionContext context, final Message send) throws IOException {
175                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
176                }
177    
178                @Override
179                public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
180                    MemoryTransactionStore.this.addMessage(getDelegate(), send);
181                }
182    
183                @Override
184                public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException {
185                    MemoryTransactionStore.this.addMessage(getDelegate(), message);
186                    return AbstractMessageStore.FUTURE;
187                 }
188    
189                @Override
190                public Future<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException {
191                    MemoryTransactionStore.this.addMessage(getDelegate(), message);
192                    return AbstractMessageStore.FUTURE;
193                 }
194    
195                @Override
196                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
197                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
198                }
199    
200                @Override
201                public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException {
202                    MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
203                }
204    
205                @Override
206                public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
207                                MessageId messageId, MessageAck ack) throws IOException {
208                    MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId,
209                            subscriptionName, messageId, ack);
210                }
211            };
212            onProxyTopicStore(proxyTopicMessageStore);
213            return proxyTopicMessageStore;
214        }
215    
216        protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
217        }
218    
219        /**
220         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
221         */
222        public void prepare(TransactionId txid) throws IOException {
223            Tx tx = inflightTransactions.remove(txid);
224            if (tx == null) {
225                return;
226            }
227            preparedTransactions.put(txid, tx);
228        }
229    
230        public Tx getTx(Object txid) {
231            Tx tx = inflightTransactions.get(txid);
232            if (tx == null) {
233                tx = new Tx();
234                inflightTransactions.put(txid, tx);
235            }
236            return tx;
237        }
238    
239        public Tx getPreparedTx(TransactionId txid) {
240            Tx tx = preparedTransactions.get(txid);
241            if (tx == null) {
242                tx = new Tx();
243                preparedTransactions.put(txid, tx);
244            }
245            return tx;
246        }
247    
248        public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
249            if (preCommit != null) {
250                preCommit.run();
251            }
252            Tx tx;
253            if (wasPrepared) {
254                tx = preparedTransactions.remove(txid);
255            } else {
256                tx = inflightTransactions.remove(txid);
257            }
258    
259            if (tx != null) {
260                tx.commit();
261            }
262            if (postCommit != null) {
263                postCommit.run();
264            }
265        }
266    
267        /**
268         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
269         */
270        public void rollback(TransactionId txid) throws IOException {
271            preparedTransactions.remove(txid);
272            inflightTransactions.remove(txid);
273        }
274    
275        public void start() throws Exception {
276        }
277    
278        public void stop() throws Exception {
279        }
280    
281        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
282            // All the inflight transactions get rolled back..
283            inflightTransactions.clear();
284            this.doingRecover = true;
285            try {
286                for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
287                    Object txid = iter.next();
288                    Tx tx = preparedTransactions.get(txid);
289                    listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
290                    onRecovered(tx);
291                }
292            } finally {
293                this.doingRecover = false;
294            }
295        }
296    
297        protected void onRecovered(Tx tx) {
298        }
299    
300        /**
301         * @param message
302         * @throws IOException
303         */
304        void addMessage(final MessageStore destination, final Message message) throws IOException {
305    
306            if (doingRecover) {
307                return;
308            }
309    
310            if (message.getTransactionId() != null) {
311                Tx tx = getTx(message.getTransactionId());
312                tx.add(new AddMessageCommand() {
313                    public Message getMessage() {
314                        return message;
315                    }
316    
317                    @Override
318                    public MessageStore getMessageStore() {
319                        return destination;
320                    }
321    
322                    public void run(ConnectionContext ctx) throws IOException {
323                        destination.addMessage(ctx, message);
324                    }
325    
326                });
327            } else {
328                destination.addMessage(null, message);
329            }
330        }
331    
332        /**
333         * @param ack
334         * @throws IOException
335         */
336        final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
337            if (doingRecover) {
338                return;
339            }
340    
341            if (ack.isInTransaction()) {
342                Tx tx = getTx(ack.getTransactionId());
343                tx.add(new RemoveMessageCommand() {
344                    public MessageAck getMessageAck() {
345                        return ack;
346                    }
347    
348                    public void run(ConnectionContext ctx) throws IOException {
349                        destination.removeMessage(ctx, ack);
350                    }
351    
352                    @Override
353                    public MessageStore getMessageStore() {
354                        return destination;
355                    }
356                });
357            } else {
358                destination.removeMessage(null, ack);
359            }
360        }
361    
362        public void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName,
363                               final MessageId messageId, final MessageAck ack) throws IOException {
364            if (doingRecover) {
365                return;
366            }
367    
368            if (ack.isInTransaction()) {
369                Tx tx = getTx(ack.getTransactionId());
370                tx.add(new RemoveMessageCommand() {
371                    public MessageAck getMessageAck() {
372                        return ack;
373                    }
374    
375                    public void run(ConnectionContext ctx) throws IOException {
376                        destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
377                    }
378    
379                    @Override
380                    public MessageStore getMessageStore() {
381                        return destination;
382                    }
383                });
384            } else {
385                destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
386            }
387        }
388    
389    
390        public void delete() {
391            inflightTransactions.clear();
392            preparedTransactions.clear();
393            doingRecover = false;
394        }
395    
396    }