001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.kahadaptor;
018    
019    import java.io.IOException;
020    import java.util.Iterator;
021    import java.util.Map;
022    import java.util.Map.Entry;
023    import java.util.concurrent.ConcurrentHashMap;
024    
025    import org.apache.activemq.broker.BrokerService;
026    import org.apache.activemq.broker.BrokerServiceAware;
027    import org.apache.activemq.broker.ConnectionContext;
028    import org.apache.activemq.command.Message;
029    import org.apache.activemq.command.MessageAck;
030    import org.apache.activemq.command.MessageId;
031    import org.apache.activemq.command.TransactionId;
032    import org.apache.activemq.command.XATransactionId;
033    import org.apache.activemq.kaha.RuntimeStoreException;
034    import org.apache.activemq.store.MessageStore;
035    import org.apache.activemq.store.ProxyMessageStore;
036    import org.apache.activemq.store.ProxyTopicMessageStore;
037    import org.apache.activemq.store.TopicMessageStore;
038    import org.apache.activemq.store.TransactionRecoveryListener;
039    import org.apache.activemq.store.TransactionStore;
040    import org.slf4j.Logger;
041    import org.slf4j.LoggerFactory;
042    
043    /**
044     * Provides a TransactionStore implementation that can create transaction aware
045     * MessageStore objects from non transaction aware MessageStore objects.
046     *
047     *
048     */
049    public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {
050        private static final Logger LOG = LoggerFactory.getLogger(KahaTransactionStore.class);
051    
052        private final Map transactions = new ConcurrentHashMap();
053        private final Map prepared;
054        private final KahaPersistenceAdapter adaptor;
055    
056        private BrokerService brokerService;
057    
058        KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
059            this.adaptor = adaptor;
060            this.prepared = preparedMap;
061        }
062    
063        public MessageStore proxy(MessageStore messageStore) {
064            return new ProxyMessageStore(messageStore) {
065                @Override
066                public void addMessage(ConnectionContext context, final Message send) throws IOException {
067                    KahaTransactionStore.this.addMessage(getDelegate(), send);
068                }
069    
070                @Override
071                public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException {
072                    KahaTransactionStore.this.addMessage(getDelegate(), send);
073                }
074    
075                @Override
076                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
077                    KahaTransactionStore.this.removeMessage(getDelegate(), ack);
078                }
079            };
080        }
081    
082        public TopicMessageStore proxy(TopicMessageStore messageStore) {
083            return new ProxyTopicMessageStore(messageStore) {
084                @Override
085                public void addMessage(ConnectionContext context, final Message send) throws IOException {
086                    KahaTransactionStore.this.addMessage(getDelegate(), send);
087                }
088    
089                @Override
090                public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
091                    KahaTransactionStore.this.removeMessage(getDelegate(), ack);
092                }
093    
094                @Override
095                public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
096                                MessageId messageId, MessageAck ack) throws IOException {
097                    KahaTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId, subscriptionName, messageId, ack);
098                }
099            };
100        }
101    
102        /**
103         * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
104         */
105        public void prepare(TransactionId txid) {
106            KahaTransaction tx = getTx(txid);
107            if (tx != null) {
108                tx.prepare();
109                prepared.put(txid, tx);
110            }
111        }
112    
113        public void commit(TransactionId txid, boolean wasPrepared, Runnable before,Runnable after) throws IOException {
114            if(before != null) {
115                before.run();
116            }
117            KahaTransaction tx = getTx(txid);
118            if (tx != null) {
119                tx.commit(this);
120                removeTx(txid);
121            }
122            if (after != null) {
123                after.run();
124            }
125        }
126    
127        /**
128         * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
129         */
130        public void rollback(TransactionId txid) {
131            KahaTransaction tx = getTx(txid);
132            if (tx != null) {
133                tx.rollback();
134                removeTx(txid);
135            }
136        }
137    
138        public void start() throws Exception {
139        }
140    
141        public void stop() throws Exception {
142        }
143    
144        public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
145            for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) {
146                Map.Entry entry = (Entry)i.next();
147                XATransactionId xid = (XATransactionId)entry.getKey();
148                KahaTransaction kt = (KahaTransaction)entry.getValue();
149                listener.recover(xid, kt.getMessages(), kt.getAcks());
150            }
151        }
152    
153        /**
154         * @param message
155         * @throws IOException
156         */
157        void addMessage(final MessageStore destination, final Message message) throws IOException {
158            try {
159                if (message.isInTransaction()) {
160                    KahaTransaction tx = getOrCreateTx(message.getTransactionId());
161                    tx.add((KahaMessageStore)destination, message);
162                } else {
163                    destination.addMessage(null, message);
164                }
165            } catch (RuntimeStoreException rse) {
166                if (rse.getCause() instanceof IOException) {
167                    brokerService.handleIOException((IOException)rse.getCause());
168                }
169                throw rse;
170            }
171        }
172    
173        /**
174         * @param ack
175         * @throws IOException
176         */
177        final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
178            try {
179                if (ack.isInTransaction()) {
180                    KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
181                    tx.add((KahaMessageStore)destination, ack);
182                } else {
183                    destination.removeMessage(null, ack);
184                }
185            } catch (RuntimeStoreException rse) {
186                if (rse.getCause() instanceof IOException) {
187                    brokerService.handleIOException((IOException)rse.getCause());
188                }
189                throw rse;
190            }
191        }
192    
193        final void acknowledge(final TopicMessageStore destination, String clientId,
194                               String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
195            try {
196                if (ack.isInTransaction()) {
197                    KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
198                    tx.add((KahaMessageStore)destination, clientId, subscriptionName, messageId, ack);
199                } else {
200                    destination.acknowledge(null, clientId, subscriptionName, messageId, ack);
201                }
202            } catch (RuntimeStoreException rse) {
203                if (rse.getCause() instanceof IOException) {
204                    brokerService.handleIOException((IOException)rse.getCause());
205                }
206                throw rse;
207            }
208        }
209    
210        protected synchronized KahaTransaction getTx(TransactionId key) {
211            KahaTransaction result = (KahaTransaction)transactions.get(key);
212            if (result == null) {
213                result = (KahaTransaction)prepared.get(key);
214            }
215            return result;
216        }
217    
218        protected synchronized KahaTransaction getOrCreateTx(TransactionId key) {
219            KahaTransaction result = (KahaTransaction)transactions.get(key);
220            if (result == null) {
221                result = new KahaTransaction();
222                transactions.put(key, result);
223            }
224            return result;
225        }
226    
227        protected synchronized void removeTx(TransactionId key) {
228            transactions.remove(key);
229            prepared.remove(key);
230        }
231    
232        public void delete() {
233            transactions.clear();
234            prepared.clear();
235        }
236    
237        protected MessageStore getStoreById(Object id) {
238            return adaptor.retrieveMessageStore(id);
239        }
240    
241        public void setBrokerService(BrokerService brokerService) {
242            this.brokerService = brokerService;
243        }
244    }