001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.store.journal;
019    
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.Iterator;
023    import java.util.LinkedHashMap;
024    import java.util.Map;
025    import javax.transaction.xa.XAException;
026    import org.apache.activeio.journal.RecordLocation;
027    import org.apache.activemq.command.JournalTopicAck;
028    import org.apache.activemq.command.JournalTransaction;
029    import org.apache.activemq.command.Message;
030    import org.apache.activemq.command.MessageAck;
031    import org.apache.activemq.command.TransactionId;
032    import org.apache.activemq.command.XATransactionId;
033    import org.apache.activemq.store.TransactionRecoveryListener;
034    import org.apache.activemq.store.TransactionStore;
035    
036    /**
037     */
038    public class JournalTransactionStore implements TransactionStore {
039    
040        private final JournalPersistenceAdapter peristenceAdapter;
041        private final Map<Object, Tx> inflightTransactions = new LinkedHashMap<Object, Tx>();
042        private final Map<TransactionId, Tx> preparedTransactions = new LinkedHashMap<TransactionId, Tx>();
043        private boolean doingRecover;
044    
045        public static class TxOperation {
046    
047            static final byte ADD_OPERATION_TYPE = 0;
048            static final byte REMOVE_OPERATION_TYPE = 1;
049            static final byte ACK_OPERATION_TYPE = 3;
050    
051            public byte operationType;
052            public JournalMessageStore store;
053            public Object data;
054    
055            public TxOperation(byte operationType, JournalMessageStore store, Object data) {
056                this.operationType = operationType;
057                this.store = store;
058                this.data = data;
059            }
060    
061        }
062    
063        /**
064         * Operations
065         * 
066         * 
067         */
068        public static class Tx {
069    
070            private final RecordLocation location;
071            private final ArrayList<TxOperation> operations = new ArrayList<TxOperation>();
072    
073            public Tx(RecordLocation location) {
074                this.location = location;
075            }
076    
077            public void add(JournalMessageStore store, Message msg) {
078                operations.add(new TxOperation(TxOperation.ADD_OPERATION_TYPE, store, msg));
079            }
080    
081            public void add(JournalMessageStore store, MessageAck ack) {
082                operations.add(new TxOperation(TxOperation.REMOVE_OPERATION_TYPE, store, ack));
083            }
084    
085            public void add(JournalTopicMessageStore store, JournalTopicAck ack) {
086                operations.add(new TxOperation(TxOperation.ACK_OPERATION_TYPE, store, ack));
087            }
088    
089            public Message[] getMessages() {
090                ArrayList<Object> list = new ArrayList<Object>();
091                for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
092                    TxOperation op = iter.next();
093                    if (op.operationType == TxOperation.ADD_OPERATION_TYPE) {
094                        list.add(op.data);
095                    }
096                }
097                Message rc[] = new Message[list.size()];
098                list.toArray(rc);
099                return rc;
100            }
101    
102            public MessageAck[] getAcks() {
103                ArrayList<Object> list = new ArrayList<Object>();
104                for (Iterator<TxOperation> iter = operations.iterator(); iter.hasNext();) {
105                    TxOperation op = iter.next();
106                    if (op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
107                        list.add(op.data);
108                    }
109                }
110                MessageAck rc[] = new MessageAck[list.size()];
111                list.toArray(rc);
112                return rc;
113            }
114    
115            public ArrayList<TxOperation> getOperations() {
116                return operations;
117            }
118    
119        }
120    
121        public JournalTransactionStore(JournalPersistenceAdapter adapter) {
122            this.peristenceAdapter = adapter;
123        }
124    
125        /**
126         * @throws IOException
127         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
128         */
129        public void prepare(TransactionId txid) throws IOException {
130            Tx tx = null;
131            synchronized (inflightTransactions) {
132                tx = inflightTransactions.remove(txid);
133            }
134            if (tx == null) {
135                return;
136            }
137            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false),
138                                           true);
139            synchronized (preparedTransactions) {
140                preparedTransactions.put(txid, tx);
141            }
142        }
143    
144        /**
145         * @throws IOException
146         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
147         */
148        public void replayPrepare(TransactionId txid) throws IOException {
149            Tx tx = null;
150            synchronized (inflightTransactions) {
151                tx = inflightTransactions.remove(txid);
152            }
153            if (tx == null) {
154                return;
155            }
156            synchronized (preparedTransactions) {
157                preparedTransactions.put(txid, tx);
158            }
159        }
160    
161        public Tx getTx(Object txid, RecordLocation location) {
162            Tx tx = null;
163            synchronized (inflightTransactions) {
164                tx = inflightTransactions.get(txid);
165            }
166            if (tx == null) {
167                tx = new Tx(location);
168                inflightTransactions.put(txid, tx);
169            }
170            return tx;
171        }
172    
173        /**
174         * @throws XAException
175         * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
176         */
177        public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
178            Tx tx;
179            if (preCommit != null) {
180                preCommit.run();
181            }
182            if (wasPrepared) {
183                synchronized (preparedTransactions) {
184                    tx = preparedTransactions.remove(txid);
185                }
186            } else {
187                synchronized (inflightTransactions) {
188                    tx = inflightTransactions.remove(txid);
189                }
190            }
191            if (tx == null) {
192                if (postCommit != null) {
193                    postCommit.run();
194                }
195                return;
196            }
197            if (txid.isXATransaction()) {
198                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid,
199                                                                      wasPrepared), true);
200            } else {
201                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid,
202                                                                      wasPrepared), true);
203            }
204            if (postCommit != null) {
205                postCommit.run();
206            }
207        }
208    
209        /**
210         * @throws XAException
211         * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
212         */
213        public Tx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
214            if (wasPrepared) {
215                synchronized (preparedTransactions) {
216                    return preparedTransactions.remove(txid);
217                }
218            } else {
219                synchronized (inflightTransactions) {
220                    return inflightTransactions.remove(txid);
221                }
222            }
223        }
224    
225        /**
226         * @throws IOException
227         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
228         */
229        public void rollback(TransactionId txid) throws IOException {
230            Tx tx = null;
231            synchronized (inflightTransactions) {
232                tx = inflightTransactions.remove(txid);
233            }
234            if (tx != null) {
235                synchronized (preparedTransactions) {
236                    tx = preparedTransactions.remove(txid);
237                }
238            }
239            if (tx != null) {
240                if (txid.isXATransaction()) {
241                    peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid,
242                                                                          false), true);
243                } else {
244                    peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,
245                                                                          txid, false), true);
246                }
247            }
248        }
249    
250        /**
251         * @throws IOException
252         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
253         */
254        public void replayRollback(TransactionId txid) throws IOException {
255            boolean inflight = false;
256            synchronized (inflightTransactions) {
257                inflight = inflightTransactions.remove(txid) != null;
258            }
259            if (inflight) {
260                synchronized (preparedTransactions) {
261                    preparedTransactions.remove(txid);
262                }
263            }
264        }
265    
266        public void start() throws Exception {
267        }
268    
269        public void stop() throws Exception {
270        }
271    
272        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
273            // All the in-flight transactions get rolled back..
274            synchronized (inflightTransactions) {
275                inflightTransactions.clear();
276            }
277            this.doingRecover = true;
278            try {
279                Map<TransactionId, Tx> txs = null;
280                synchronized (preparedTransactions) {
281                    txs = new LinkedHashMap<TransactionId, Tx>(preparedTransactions);
282                }
283                for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) {
284                    Object txid = iter.next();
285                    Tx tx = txs.get(txid);
286                    listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
287                }
288            } finally {
289                this.doingRecover = false;
290            }
291        }
292    
293        /**
294         * @param message
295         * @throws IOException
296         */
297        void addMessage(JournalMessageStore store, Message message, RecordLocation location) throws IOException {
298            Tx tx = getTx(message.getTransactionId(), location);
299            tx.add(store, message);
300        }
301    
302        /**
303         * @param ack
304         * @throws IOException
305         */
306        public void removeMessage(JournalMessageStore store, MessageAck ack, RecordLocation location)
307            throws IOException {
308            Tx tx = getTx(ack.getTransactionId(), location);
309            tx.add(store, ack);
310        }
311    
312        public void acknowledge(JournalTopicMessageStore store, JournalTopicAck ack, RecordLocation location) {
313            Tx tx = getTx(ack.getTransactionId(), location);
314            tx.add(store, ack);
315        }
316    
317        public RecordLocation checkpoint() throws IOException {
318            // Nothing really to checkpoint.. since, we don't
319            // checkpoint tx operations in to long term store until they are
320            // committed.
321            // But we keep track of the first location of an operation
322            // that was associated with an active tx. The journal can not
323            // roll over active tx records.
324            RecordLocation rc = null;
325            synchronized (inflightTransactions) {
326                for (Iterator<Tx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
327                    Tx tx = iter.next();
328                    RecordLocation location = tx.location;
329                    if (rc == null || rc.compareTo(location) < 0) {
330                        rc = location;
331                    }
332                }
333            }
334            synchronized (preparedTransactions) {
335                for (Iterator<Tx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
336                    Tx tx = iter.next();
337                    RecordLocation location = tx.location;
338                    if (rc == null || rc.compareTo(location) < 0) {
339                        rc = location;
340                    }
341                }
342                return rc;
343            }
344        }
345    
346        public boolean isDoingRecover() {
347            return doingRecover;
348        }
349    
350    }