001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    
018    package org.apache.activemq.store.amq;
019    
020    import java.io.IOException;
021    import java.util.Iterator;
022    import java.util.LinkedHashMap;
023    import java.util.Map;
024    import javax.transaction.xa.XAException;
025    import org.apache.activemq.command.JournalTopicAck;
026    import org.apache.activemq.command.JournalTransaction;
027    import org.apache.activemq.command.Message;
028    import org.apache.activemq.command.MessageAck;
029    import org.apache.activemq.command.TransactionId;
030    import org.apache.activemq.command.XATransactionId;
031    import org.apache.activemq.kaha.impl.async.Location;
032    import org.apache.activemq.store.TransactionRecoveryListener;
033    import org.apache.activemq.store.TransactionStore;
034    
035    /**
036     */
037    public class AMQTransactionStore implements TransactionStore {
038    
039        protected Map<TransactionId, AMQTx> inflightTransactions = new LinkedHashMap<TransactionId, AMQTx>();
040        Map<TransactionId, AMQTx> preparedTransactions = new LinkedHashMap<TransactionId, AMQTx>();
041    
042        private final AMQPersistenceAdapter peristenceAdapter;
043        private boolean doingRecover;
044    
045        public AMQTransactionStore(AMQPersistenceAdapter adapter) {
046            this.peristenceAdapter = adapter;
047        }
048    
049        /**
050         * @throws IOException
051         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
052         */
053        public void prepare(TransactionId txid) throws IOException {
054            AMQTx tx = null;
055            synchronized (inflightTransactions) {
056                tx = inflightTransactions.remove(txid);
057            }
058            if (tx == null) {
059                return;
060            }
061            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true);
062            synchronized (preparedTransactions) {
063                preparedTransactions.put(txid, tx);
064            }
065        }
066    
067        /**
068         * @throws IOException
069         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
070         */
071        public void replayPrepare(TransactionId txid) throws IOException {
072            AMQTx tx = null;
073            synchronized (inflightTransactions) {
074                tx = inflightTransactions.remove(txid);
075            }
076            if (tx == null) {
077                return;
078            }
079            synchronized (preparedTransactions) {
080                preparedTransactions.put(txid, tx);
081            }
082        }
083    
084        public AMQTx getTx(TransactionId txid, Location location) {
085            AMQTx tx = null;
086            synchronized (inflightTransactions) {
087                tx = inflightTransactions.get(txid);
088                if (tx == null) {
089                    tx = new AMQTx(location);
090                    inflightTransactions.put(txid, tx);
091                }
092            }
093            return tx;
094        }
095    
096        /**
097         * @throws XAException
098         * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
099         */
100        public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
101            if (preCommit != null) {
102                preCommit.run();
103            }
104            AMQTx tx;
105            if (wasPrepared) {
106                synchronized (preparedTransactions) {
107                    tx = preparedTransactions.remove(txid);
108                }
109            } else {
110                synchronized (inflightTransactions) {
111                    tx = inflightTransactions.remove(txid);
112                }
113            }
114            if (tx == null) {
115                if (postCommit != null) {
116                    postCommit.run();
117                }
118                return;
119            }
120            if (txid.isXATransaction()) {
121                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared), true,true);
122            } else {
123                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), true,true);
124            }
125            if (postCommit != null) {
126                postCommit.run();
127            }
128        }
129    
130        /**
131         * @throws XAException
132         * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
133         */
134        public AMQTx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
135            if (wasPrepared) {
136                synchronized (preparedTransactions) {
137                    return preparedTransactions.remove(txid);
138                }
139            } else {
140                synchronized (inflightTransactions) {
141                    return inflightTransactions.remove(txid);
142                }
143            }
144        }
145    
146        /**
147         * @throws IOException
148         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
149         */
150        public void rollback(TransactionId txid) throws IOException {
151            AMQTx tx = null;
152            synchronized (inflightTransactions) {
153                tx = inflightTransactions.remove(txid);
154            }
155            if (tx != null) {
156                synchronized (preparedTransactions) {
157                    tx = preparedTransactions.remove(txid);
158                }
159            }
160            if (tx != null) {
161                if (txid.isXATransaction()) {
162                    peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false), true,true);
163                } else {
164                    peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false), true,true);
165                }
166            }
167        }
168    
169        /**
170         * @throws IOException
171         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
172         */
173        public void replayRollback(TransactionId txid) throws IOException {
174            boolean inflight = false;
175            synchronized (inflightTransactions) {
176                inflight = inflightTransactions.remove(txid) != null;
177            }
178            if (inflight) {
179                synchronized (preparedTransactions) {
180                    preparedTransactions.remove(txid);
181                }
182            }
183        }
184    
185        public void start() throws Exception {
186        }
187    
188        public void stop() throws Exception {
189        }
190    
191        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
192            // All the in-flight transactions get rolled back..
193            synchronized (inflightTransactions) {
194                inflightTransactions.clear();
195            }
196            this.doingRecover = true;
197            try {
198                Map<TransactionId, AMQTx> txs = null;
199                synchronized (preparedTransactions) {
200                    txs = new LinkedHashMap<TransactionId, AMQTx>(preparedTransactions);
201                }
202                for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) {
203                    Object txid = iter.next();
204                    AMQTx tx = txs.get(txid);
205                    listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
206                }
207            } finally {
208                this.doingRecover = false;
209            }
210        }
211    
212        /**
213         * @param message
214         * @throws IOException
215         */
216        void addMessage(AMQMessageStore store, Message message, Location location) throws IOException {
217            AMQTx tx = getTx(message.getTransactionId(), location);
218            tx.add(store, message, location);
219        }
220    
221        /**
222         * @param ack
223         * @throws IOException
224         */
225        public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws IOException {
226            AMQTx tx = getTx(ack.getTransactionId(), location);
227            tx.add(store, ack);
228        }
229    
230        public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location) {
231            AMQTx tx = getTx(ack.getTransactionId(), location);
232            tx.add(store, ack);
233        }
234    
235        public Location checkpoint() throws IOException {
236            // Nothing really to checkpoint.. since, we don't
237            // checkpoint tx operations in to long term store until they are
238            // committed.
239            // But we keep track of the first location of an operation
240            // that was associated with an active tx. The journal can not
241            // roll over active tx records.
242            Location minimumLocationInUse = null;
243            synchronized (inflightTransactions) {
244                for (Iterator<AMQTx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
245                    AMQTx tx = iter.next();
246                    Location location = tx.getLocation();
247                    if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) {
248                        minimumLocationInUse = location;
249                    }
250                }
251            }
252            synchronized (preparedTransactions) {
253                for (Iterator<AMQTx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
254                    AMQTx tx = iter.next();
255                    Location location = tx.getLocation();
256                    if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) {
257                        minimumLocationInUse = location;
258                    }
259                }
260                return minimumLocationInUse;
261            }
262        }
263    
264        public boolean isDoingRecover() {
265            return doingRecover;
266        }
267    
268        /**
269         * @return the preparedTransactions
270         */
271        public Map<TransactionId, AMQTx> getPreparedTransactions() {
272            return this.preparedTransactions;
273        }
274    
275        /**
276         * @param preparedTransactions the preparedTransactions to set
277         */
278        public void setPreparedTransactions(Map<TransactionId, AMQTx> preparedTransactions) {
279            if (preparedTransactions != null) {
280                this.preparedTransactions.clear();
281                this.preparedTransactions.putAll(preparedTransactions);
282            }
283        }
284    }