001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.Iterator;
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.command.ActiveMQDestination;
025import org.apache.activemq.command.Message;
026import org.apache.activemq.command.MessageAck;
027import org.apache.activemq.command.MessageId;
028import org.apache.activemq.command.TransactionId;
029import org.apache.activemq.command.XATransactionId;
030import org.apache.activemq.store.IndexListener;
031import org.apache.activemq.store.MessageStore;
032import org.apache.activemq.store.ProxyMessageStore;
033import org.apache.activemq.store.ProxyTopicMessageStore;
034import org.apache.activemq.store.TopicMessageStore;
035import org.apache.activemq.store.TransactionRecoveryListener;
036import org.apache.activemq.store.memory.MemoryTransactionStore;
037import org.apache.activemq.util.ByteSequence;
038import org.apache.activemq.util.DataByteArrayInputStream;
039
040/**
041 * respect 2pc prepare
042 * uses local transactions to maintain prepared state
043 * xid column provides transaction flag for additions and removals
044 * a commit clears that context and completes the work
045 * a rollback clears the flag and removes the additions
046 * Essentially a prepare is an insert &| update transaction
047 *  commit|rollback is an update &| remove
048 */
049public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
050
051
052    private HashMap<ActiveMQDestination, MessageStore> topicStores = new HashMap<ActiveMQDestination, MessageStore>();
053    private HashMap<ActiveMQDestination, MessageStore> queueStores = new HashMap<ActiveMQDestination, MessageStore>();
054
055    public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) {
056        super(jdbcPersistenceAdapter);
057    }
058
059    @Override
060    public void prepare(TransactionId txid) throws IOException {
061        Tx tx = inflightTransactions.remove(txid);
062        if (tx == null) {
063            return;
064        }
065
066        ConnectionContext ctx = new ConnectionContext();
067        // setting the xid modifies the add/remove to be pending transaction outcome
068        ctx.setXid((XATransactionId) txid);
069        persistenceAdapter.beginTransaction(ctx);
070        try {
071
072            // Do all the message adds.
073            for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) {
074                AddMessageCommand cmd = iter.next();
075                cmd.run(ctx);
076            }
077            // And removes..
078            for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext();) {
079                RemoveMessageCommand cmd = iter.next();
080                cmd.run(ctx);
081            }
082
083        } catch ( IOException e ) {
084            persistenceAdapter.rollbackTransaction(ctx);
085            throw e;
086        }
087        persistenceAdapter.commitTransaction(ctx);
088
089        ctx.setXid(null);
090        // setup for commit outcome
091        ArrayList<AddMessageCommand> updateFromPreparedStateCommands = new ArrayList<AddMessageCommand>();
092        for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) {
093            final AddMessageCommand addMessageCommand = iter.next();
094            updateFromPreparedStateCommands.add(new CommitAddOutcome(addMessageCommand));
095        }
096        tx.messages = updateFromPreparedStateCommands;
097        preparedTransactions.put(txid, tx);
098
099    }
100
101
102    class CommitAddOutcome implements AddMessageCommand {
103        final Message message;
104        JDBCMessageStore jdbcMessageStore;
105
106        public CommitAddOutcome(JDBCMessageStore jdbcMessageStore, Message message) {
107            this.jdbcMessageStore = jdbcMessageStore;
108            this.message = message;
109        }
110
111        public CommitAddOutcome(AddMessageCommand addMessageCommand) {
112            this((JDBCMessageStore)addMessageCommand.getMessageStore(), addMessageCommand.getMessage());
113        }
114
115        @Override
116        public Message getMessage() {
117            return message;
118        }
119
120        @Override
121        public MessageStore getMessageStore() {
122            return jdbcMessageStore;
123        }
124
125        @Override
126        public void run(final ConnectionContext context) throws IOException {
127            JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) persistenceAdapter;
128            final Long preparedEntrySequence = (Long) message.getMessageId().getEntryLocator();
129            TransactionContext c = jdbcPersistenceAdapter.getTransactionContext(context);
130
131            synchronized (jdbcMessageStore.pendingAdditions) {
132                message.getMessageId().setEntryLocator(jdbcPersistenceAdapter.getNextSequenceId());
133
134                c.onCompletion(new Runnable() {
135                    @Override
136                    public void run() {
137                        message.getMessageId().setFutureOrSequenceLong(message.getMessageId().getEntryLocator());
138                    }
139                });
140
141                if (jdbcMessageStore.getIndexListener() != null) {
142                    jdbcMessageStore.getIndexListener().onAdd(new IndexListener.MessageContext(context, message, null));
143                }
144            }
145
146            jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), preparedEntrySequence);
147            jdbcMessageStore.onAdd(message, (Long)message.getMessageId().getEntryLocator(), message.getPriority());
148        }
149
150        @Override
151        public void setMessageStore(MessageStore messageStore) {
152            jdbcMessageStore = (JDBCMessageStore) messageStore;
153        }
154    }
155
156    @Override
157    public void rollback(TransactionId txid) throws IOException {
158
159        Tx tx = inflightTransactions.remove(txid);
160        if (tx == null) {
161            tx = preparedTransactions.remove(txid);
162            if (tx != null) {
163                // undo prepare work
164                ConnectionContext ctx = new ConnectionContext();
165                persistenceAdapter.beginTransaction(ctx);
166                try {
167
168                    for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext(); ) {
169                        final Message message = iter.next().getMessage();
170                        // need to delete the row
171                        ((JDBCPersistenceAdapter) persistenceAdapter).commitRemove(ctx, new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, 1));
172                    }
173
174                    for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext(); ) {
175                        RemoveMessageCommand removeMessageCommand = iter.next();
176                        if (removeMessageCommand instanceof LastAckCommand ) {
177                            ((LastAckCommand)removeMessageCommand).rollback(ctx);
178                        } else {
179                            MessageId messageId = removeMessageCommand.getMessageAck().getLastMessageId();
180                            // need to unset the txid flag on the existing row
181                            ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, messageId, (Long)messageId.getEntryLocator());
182                        }
183                    }
184                } catch (IOException e) {
185                    persistenceAdapter.rollbackTransaction(ctx);
186                    throw e;
187                }
188                persistenceAdapter.commitTransaction(ctx);
189            }
190        }
191    }
192
193    @Override
194    public void recover(TransactionRecoveryListener listener) throws IOException {
195        ((JDBCPersistenceAdapter)persistenceAdapter).recover(this);
196        super.recover(listener);
197    }
198
199    public void recoverAdd(long id, byte[] messageBytes) throws IOException {
200        final Message message = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(messageBytes));
201        message.getMessageId().setFutureOrSequenceLong(id);
202        message.getMessageId().setEntryLocator(id);
203        Tx tx = getPreparedTx(message.getTransactionId());
204        tx.add(new CommitAddOutcome(null, message));
205    }
206
207    public void recoverAck(long id, byte[] xid, byte[] message) throws IOException {
208        Message msg = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(message));
209        msg.getMessageId().setFutureOrSequenceLong(id);
210        msg.getMessageId().setEntryLocator(id);
211        Tx tx = getPreparedTx(new XATransactionId(xid));
212        final MessageAck ack = new MessageAck(msg, MessageAck.STANDARD_ACK_TYPE, 1);
213        tx.add(new RemoveMessageCommand() {
214            @Override
215            public MessageAck getMessageAck() {
216                return ack;
217            }
218
219            @Override
220            public void run(ConnectionContext context) throws IOException {
221                ((JDBCPersistenceAdapter)persistenceAdapter).commitRemove(context, ack);
222            }
223
224            @Override
225            public MessageStore getMessageStore() {
226                return null;
227            }
228
229        });
230
231    }
232
233    interface LastAckCommand extends RemoveMessageCommand {
234        void rollback(ConnectionContext context) throws IOException;
235
236        String getClientId();
237
238        String getSubName();
239
240        long getSequence();
241
242        byte getPriority();
243
244        void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore);
245    }
246
247    public void recoverLastAck(byte[] encodedXid, final ActiveMQDestination destination, final String subName, final String clientId) throws IOException {
248        Tx tx = getPreparedTx(new XATransactionId(encodedXid));
249        DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXid);
250        inputStream.skipBytes(1); // +|-
251        final long lastAck = inputStream.readLong();
252        final byte priority = inputStream.readByte();
253        final MessageAck ack = new MessageAck();
254        ack.setDestination(destination);
255        tx.add(new LastAckCommand() {
256            JDBCTopicMessageStore jdbcTopicMessageStore;
257
258            @Override
259            public MessageAck getMessageAck() {
260                return ack;
261            }
262
263            @Override
264            public MessageStore getMessageStore() {
265                return jdbcTopicMessageStore;
266            }
267
268            @Override
269            public void run(ConnectionContext context) throws IOException {
270                ((JDBCPersistenceAdapter)persistenceAdapter).commitLastAck(context, lastAck, priority, destination, subName, clientId);
271                jdbcTopicMessageStore.complete(clientId, subName);
272            }
273
274            @Override
275            public void rollback(ConnectionContext context) throws IOException {
276                ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, priority, jdbcTopicMessageStore.getDestination(), subName, clientId);
277                jdbcTopicMessageStore.complete(clientId, subName);
278            }
279
280            @Override
281            public String getClientId() {
282                return clientId;
283            }
284
285            @Override
286            public String getSubName() {
287                return subName;
288            }
289
290            @Override
291            public long getSequence() {
292                return lastAck;
293            }
294
295            @Override
296            public byte getPriority() {
297                return priority;
298            }
299
300            @Override
301            public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
302                this.jdbcTopicMessageStore = jdbcTopicMessageStore;
303            }
304        });
305
306    }
307
308    @Override
309    protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
310        topicStores.put(proxyTopicMessageStore.getDestination(), proxyTopicMessageStore.getDelegate());
311    }
312
313    @Override
314    protected void onProxyQueueStore(ProxyMessageStore proxyQueueMessageStore) {
315        queueStores.put(proxyQueueMessageStore.getDestination(), proxyQueueMessageStore.getDelegate());
316    }
317
318    @Override
319    protected void onRecovered(Tx tx) {
320        for (RemoveMessageCommand removeMessageCommand: tx.acks) {
321            if (removeMessageCommand instanceof LastAckCommand) {
322                LastAckCommand lastAckCommand = (LastAckCommand) removeMessageCommand;
323                JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) topicStores.get(lastAckCommand.getMessageAck().getDestination());
324                jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority());
325                lastAckCommand.setMessageStore(jdbcTopicMessageStore);
326            } else {
327                // when reading the store we ignore messages with non null XIDs but should include those with XIDS starting in - (pending acks in an xa transaction),
328                // but the sql is non portable to match BLOB with LIKE etc
329                // so we make up for it when we recover the ack
330                ((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment();
331            }
332        }
333        for (AddMessageCommand addMessageCommand : tx.messages) {
334            ActiveMQDestination destination = addMessageCommand.getMessage().getDestination();
335            addMessageCommand.setMessageStore(destination.isQueue() ? queueStores.get(destination) : topicStores.get(destination));
336        }
337    }
338
339    @Override
340    public void acknowledge(final TopicMessageStore topicMessageStore, final String clientId, final String subscriptionName,
341                           final MessageId messageId, final MessageAck ack) throws IOException {
342
343        if (ack.isInTransaction()) {
344            Tx tx = getTx(ack.getTransactionId());
345            tx.add(new LastAckCommand() {
346                public MessageAck getMessageAck() {
347                    return ack;
348                }
349
350                public void run(ConnectionContext ctx) throws IOException {
351                    topicMessageStore.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
352                }
353
354                @Override
355                public MessageStore getMessageStore() {
356                    return topicMessageStore;
357                }
358
359                @Override
360                public void rollback(ConnectionContext context) throws IOException {
361                    JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore)topicMessageStore;
362                    ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context,
363                            jdbcTopicMessageStore,
364                            ack,
365                            subscriptionName, clientId);
366                    jdbcTopicMessageStore.complete(clientId, subscriptionName);
367                }
368
369
370                @Override
371                public String getClientId() {
372                    return clientId;
373                }
374
375                @Override
376                public String getSubName() {
377                    return subscriptionName;
378                }
379
380                @Override
381                public long getSequence() {
382                    throw new IllegalStateException("Sequence id must be inferred from ack");
383                }
384
385                @Override
386                public byte getPriority() {
387                    throw new IllegalStateException("Priority must be inferred from ack or row");
388                }
389
390                @Override
391                public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
392                    throw new IllegalStateException("message store already known!");
393                }
394            });
395        } else {
396            topicMessageStore.acknowledge(null, clientId, subscriptionName, messageId, ack);
397        }
398    }
399
400}