001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.IOException;
020    import java.util.ArrayList;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import org.apache.activemq.broker.ConnectionContext;
024    import org.apache.activemq.command.ActiveMQDestination;
025    import org.apache.activemq.command.Message;
026    import org.apache.activemq.command.MessageAck;
027    import org.apache.activemq.command.MessageId;
028    import org.apache.activemq.command.TransactionId;
029    import org.apache.activemq.command.XATransactionId;
030    import org.apache.activemq.store.MessageStore;
031    import org.apache.activemq.store.ProxyTopicMessageStore;
032    import org.apache.activemq.store.TopicMessageStore;
033    import org.apache.activemq.store.TransactionRecoveryListener;
034    import org.apache.activemq.store.memory.MemoryTransactionStore;
035    import org.apache.activemq.util.ByteSequence;
036    import org.apache.activemq.util.DataByteArrayInputStream;
037    
038    /**
039     * respect 2pc prepare
040     * uses local transactions to maintain prepared state
041     * xid column provides transaction flag for additions and removals
042     * a commit clears that context and completes the work
043     * a rollback clears the flag and removes the additions
044     * Essentially a prepare is an insert &| update transaction
045     *  commit|rollback is an update &| remove
046     */
047    public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
048    
049    
050        private HashMap<ActiveMQDestination, MessageStore> topicStores = new HashMap<ActiveMQDestination, MessageStore>();
051    
052        public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) {
053            super(jdbcPersistenceAdapter);
054        }
055    
056        @Override
057        public void prepare(TransactionId txid) throws IOException {
058            Tx tx = inflightTransactions.remove(txid);
059            if (tx == null) {
060                return;
061            }
062    
063            ConnectionContext ctx = new ConnectionContext();
064            // setting the xid modifies the add/remove to be pending transaction outcome
065            ctx.setXid((XATransactionId) txid);
066            persistenceAdapter.beginTransaction(ctx);
067            try {
068    
069                // Do all the message adds.
070                for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) {
071                    AddMessageCommand cmd = iter.next();
072                    cmd.run(ctx);
073                }
074                // And removes..
075                for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext();) {
076                    RemoveMessageCommand cmd = iter.next();
077                    cmd.run(ctx);
078                }
079    
080            } catch ( IOException e ) {
081                persistenceAdapter.rollbackTransaction(ctx);
082                throw e;
083            }
084            persistenceAdapter.commitTransaction(ctx);
085    
086            ctx.setXid(null);
087            // setup for commit outcome
088            ArrayList<AddMessageCommand> updateFromPreparedStateCommands = new ArrayList<AddMessageCommand>();
089            for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) {
090                final AddMessageCommand addMessageCommand = iter.next();
091                updateFromPreparedStateCommands.add(new AddMessageCommand() {
092                    @Override
093                    public Message getMessage() {
094                        return addMessageCommand.getMessage();
095                    }
096    
097                    @Override
098                    public MessageStore getMessageStore() {
099                        return addMessageCommand.getMessageStore();
100                    }
101    
102                    @Override
103                    public void run(ConnectionContext context) throws IOException {
104                        JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) persistenceAdapter;
105                        Message message = addMessageCommand.getMessage();
106                        jdbcPersistenceAdapter.commitAdd(context, message.getMessageId());
107                        ((JDBCMessageStore)addMessageCommand.getMessageStore()).onAdd(
108                                message.getMessageId(),
109                                (Long)message.getMessageId().getDataLocator(),
110                                message.getPriority());
111    
112                    }
113                });
114            }
115            tx.messages = updateFromPreparedStateCommands;
116            preparedTransactions.put(txid, tx);
117    
118        }
119    
120    
121        @Override
122        public void rollback(TransactionId txid) throws IOException {
123    
124            Tx tx = inflightTransactions.remove(txid);
125            if (tx == null) {
126                tx = preparedTransactions.remove(txid);
127                if (tx != null) {
128                    // undo prepare work
129                    ConnectionContext ctx = new ConnectionContext();
130                    persistenceAdapter.beginTransaction(ctx);
131                    try {
132    
133                        for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext(); ) {
134                            final Message message = iter.next().getMessage();
135                            // need to delete the row
136                            ((JDBCPersistenceAdapter) persistenceAdapter).commitRemove(ctx, new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, 1));
137                        }
138    
139                        for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext(); ) {
140                            RemoveMessageCommand removeMessageCommand = iter.next();
141                            if (removeMessageCommand instanceof LastAckCommand ) {
142                                ((LastAckCommand)removeMessageCommand).rollback(ctx);
143                            } else {
144                                // need to unset the txid flag on the existing row
145                                ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx,
146                                        removeMessageCommand.getMessageAck().getLastMessageId());
147                            }
148                        }
149                    } catch (IOException e) {
150                        persistenceAdapter.rollbackTransaction(ctx);
151                        throw e;
152                    }
153                    persistenceAdapter.commitTransaction(ctx);
154                }
155            }
156        }
157    
158        @Override
159        public void recover(TransactionRecoveryListener listener) throws IOException {
160            ((JDBCPersistenceAdapter)persistenceAdapter).recover(this);
161            super.recover(listener);
162        }
163    
164        public void recoverAdd(long id, byte[] messageBytes) throws IOException {
165            final Message message = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(messageBytes));
166            message.getMessageId().setDataLocator(id);
167            Tx tx = getPreparedTx(message.getTransactionId());
168            tx.add(new AddMessageCommand() {
169                @Override
170                public Message getMessage() {
171                    return message;
172                }
173    
174                @Override
175                public MessageStore getMessageStore() {
176                    return null;
177                }
178    
179                @Override
180                public void run(ConnectionContext context) throws IOException {
181                    ((JDBCPersistenceAdapter)persistenceAdapter).commitAdd(null, message.getMessageId());
182                }
183    
184            });
185        }
186    
187        public void recoverAck(long id, byte[] xid, byte[] message) throws IOException {
188            Message msg = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(message));
189            msg.getMessageId().setDataLocator(id);
190            Tx tx = getPreparedTx(new XATransactionId(xid));
191            final MessageAck ack = new MessageAck(msg, MessageAck.STANDARD_ACK_TYPE, 1);
192            tx.add(new RemoveMessageCommand() {
193                @Override
194                public MessageAck getMessageAck() {
195                    return ack;
196                }
197    
198                @Override
199                public void run(ConnectionContext context) throws IOException {
200                    ((JDBCPersistenceAdapter)persistenceAdapter).commitRemove(context, ack);
201                }
202    
203                @Override
204                public MessageStore getMessageStore() {
205                    return null;
206                }
207    
208            });
209    
210        }
211    
212        interface LastAckCommand extends RemoveMessageCommand {
213            void rollback(ConnectionContext context) throws IOException;
214    
215            String getClientId();
216    
217            String getSubName();
218    
219            long getSequence();
220    
221            byte getPriority();
222    
223            void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore);
224        }
225    
226        public void recoverLastAck(byte[] encodedXid, final ActiveMQDestination destination, final String subName, final String clientId) throws IOException {
227            Tx tx = getPreparedTx(new XATransactionId(encodedXid));
228            DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXid);
229            inputStream.skipBytes(1); // +|-
230            final long lastAck = inputStream.readLong();
231            final byte priority = inputStream.readByte();
232            final MessageAck ack = new MessageAck();
233            ack.setDestination(destination);
234            tx.add(new LastAckCommand() {
235                JDBCTopicMessageStore jdbcTopicMessageStore;
236    
237                @Override
238                public MessageAck getMessageAck() {
239                    return ack;
240                }
241    
242                @Override
243                public MessageStore getMessageStore() {
244                    return jdbcTopicMessageStore;
245                }
246    
247                @Override
248                public void run(ConnectionContext context) throws IOException {
249                    ((JDBCPersistenceAdapter)persistenceAdapter).commitLastAck(context, lastAck, priority, destination, subName, clientId);
250                    jdbcTopicMessageStore.complete(clientId, subName);
251                }
252    
253                @Override
254                public void rollback(ConnectionContext context) throws IOException {
255                    ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, priority, jdbcTopicMessageStore.getDestination(), subName, clientId);
256                    jdbcTopicMessageStore.complete(clientId, subName);
257                }
258    
259                @Override
260                public String getClientId() {
261                    return clientId;
262                }
263    
264                @Override
265                public String getSubName() {
266                    return subName;
267                }
268    
269                @Override
270                public long getSequence() {
271                    return lastAck;
272                }
273    
274                @Override
275                public byte getPriority() {
276                    return priority;
277                }
278    
279                @Override
280                public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
281                    this.jdbcTopicMessageStore = jdbcTopicMessageStore;
282                }
283            });
284    
285        }
286    
287        @Override
288        protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
289            topicStores.put(proxyTopicMessageStore.getDestination(), proxyTopicMessageStore.getDelegate());
290        }
291    
292        @Override
293        protected void onRecovered(Tx tx) {
294            for (RemoveMessageCommand removeMessageCommand: tx.acks) {
295                if (removeMessageCommand instanceof LastAckCommand) {
296                    LastAckCommand lastAckCommand = (LastAckCommand) removeMessageCommand;
297                    JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) topicStores.get(lastAckCommand.getMessageAck().getDestination());
298                    jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority());
299                    lastAckCommand.setMessageStore(jdbcTopicMessageStore);
300                } else {
301                    // when reading the store we ignore messages with non null XIDs but should include those with XIDS starting in - (pending acks in an xa transaction),
302                    // but the sql is non portable to match BLOB with LIKE etc
303                    // so we make up for it when we recover the ack
304                    ((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment();
305                }
306            }
307        }
308    
309        @Override
310        public void acknowledge(final TopicMessageStore topicMessageStore, final String clientId, final String subscriptionName,
311                               final MessageId messageId, final MessageAck ack) throws IOException {
312    
313            if (ack.isInTransaction()) {
314                Tx tx = getTx(ack.getTransactionId());
315                tx.add(new LastAckCommand() {
316                    public MessageAck getMessageAck() {
317                        return ack;
318                    }
319    
320                    public void run(ConnectionContext ctx) throws IOException {
321                        topicMessageStore.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
322                    }
323    
324                    @Override
325                    public MessageStore getMessageStore() {
326                        return topicMessageStore;
327                    }
328    
329                    @Override
330                    public void rollback(ConnectionContext context) throws IOException {
331                        JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore)topicMessageStore;
332                        ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context,
333                                jdbcTopicMessageStore,
334                                ack,
335                                subscriptionName, clientId);
336                        jdbcTopicMessageStore.complete(clientId, subscriptionName);
337                    }
338    
339    
340                    @Override
341                    public String getClientId() {
342                        return clientId;
343                    }
344    
345                    @Override
346                    public String getSubName() {
347                        return subscriptionName;
348                    }
349    
350                    @Override
351                    public long getSequence() {
352                        throw new IllegalStateException("Sequence id must be inferred from ack");
353                    }
354    
355                    @Override
356                    public byte getPriority() {
357                        throw new IllegalStateException("Priority must be inferred from ack or row");
358                    }
359    
360                    @Override
361                    public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
362                        throw new IllegalStateException("message store already known!");
363                    }
364                });
365            } else {
366                topicMessageStore.acknowledge(null, clientId, subscriptionName, messageId, ack);
367            }
368        }
369    
370    }