001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.IOException;
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import org.apache.activemq.broker.ConnectionContext;
024 import org.apache.activemq.command.ActiveMQDestination;
025 import org.apache.activemq.command.Message;
026 import org.apache.activemq.command.MessageAck;
027 import org.apache.activemq.command.MessageId;
028 import org.apache.activemq.command.TransactionId;
029 import org.apache.activemq.command.XATransactionId;
030 import org.apache.activemq.store.MessageStore;
031 import org.apache.activemq.store.ProxyTopicMessageStore;
032 import org.apache.activemq.store.TopicMessageStore;
033 import org.apache.activemq.store.TransactionRecoveryListener;
034 import org.apache.activemq.store.memory.MemoryTransactionStore;
035 import org.apache.activemq.util.ByteSequence;
036 import org.apache.activemq.util.DataByteArrayInputStream;
037
038 /**
039 * respect 2pc prepare
040 * uses local transactions to maintain prepared state
041 * xid column provides transaction flag for additions and removals
042 * a commit clears that context and completes the work
043 * a rollback clears the flag and removes the additions
044 * Essentially a prepare is an insert &| update transaction
045 * commit|rollback is an update &| remove
046 */
047 public class JdbcMemoryTransactionStore extends MemoryTransactionStore {
048
049
050 private HashMap<ActiveMQDestination, MessageStore> topicStores = new HashMap<ActiveMQDestination, MessageStore>();
051
052 public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) {
053 super(jdbcPersistenceAdapter);
054 }
055
056 @Override
057 public void prepare(TransactionId txid) throws IOException {
058 Tx tx = inflightTransactions.remove(txid);
059 if (tx == null) {
060 return;
061 }
062
063 ConnectionContext ctx = new ConnectionContext();
064 // setting the xid modifies the add/remove to be pending transaction outcome
065 ctx.setXid((XATransactionId) txid);
066 persistenceAdapter.beginTransaction(ctx);
067 try {
068
069 // Do all the message adds.
070 for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) {
071 AddMessageCommand cmd = iter.next();
072 cmd.run(ctx);
073 }
074 // And removes..
075 for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext();) {
076 RemoveMessageCommand cmd = iter.next();
077 cmd.run(ctx);
078 }
079
080 } catch ( IOException e ) {
081 persistenceAdapter.rollbackTransaction(ctx);
082 throw e;
083 }
084 persistenceAdapter.commitTransaction(ctx);
085
086 ctx.setXid(null);
087 // setup for commit outcome
088 ArrayList<AddMessageCommand> updateFromPreparedStateCommands = new ArrayList<AddMessageCommand>();
089 for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) {
090 final AddMessageCommand addMessageCommand = iter.next();
091 updateFromPreparedStateCommands.add(new AddMessageCommand() {
092 @Override
093 public Message getMessage() {
094 return addMessageCommand.getMessage();
095 }
096
097 @Override
098 public MessageStore getMessageStore() {
099 return addMessageCommand.getMessageStore();
100 }
101
102 @Override
103 public void run(ConnectionContext context) throws IOException {
104 JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) persistenceAdapter;
105 Message message = addMessageCommand.getMessage();
106 jdbcPersistenceAdapter.commitAdd(context, message.getMessageId());
107 ((JDBCMessageStore)addMessageCommand.getMessageStore()).onAdd(
108 message.getMessageId(),
109 (Long)message.getMessageId().getDataLocator(),
110 message.getPriority());
111
112 }
113 });
114 }
115 tx.messages = updateFromPreparedStateCommands;
116 preparedTransactions.put(txid, tx);
117
118 }
119
120
121 @Override
122 public void rollback(TransactionId txid) throws IOException {
123
124 Tx tx = inflightTransactions.remove(txid);
125 if (tx == null) {
126 tx = preparedTransactions.remove(txid);
127 if (tx != null) {
128 // undo prepare work
129 ConnectionContext ctx = new ConnectionContext();
130 persistenceAdapter.beginTransaction(ctx);
131 try {
132
133 for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext(); ) {
134 final Message message = iter.next().getMessage();
135 // need to delete the row
136 ((JDBCPersistenceAdapter) persistenceAdapter).commitRemove(ctx, new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, 1));
137 }
138
139 for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext(); ) {
140 RemoveMessageCommand removeMessageCommand = iter.next();
141 if (removeMessageCommand instanceof LastAckCommand ) {
142 ((LastAckCommand)removeMessageCommand).rollback(ctx);
143 } else {
144 // need to unset the txid flag on the existing row
145 ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx,
146 removeMessageCommand.getMessageAck().getLastMessageId());
147 }
148 }
149 } catch (IOException e) {
150 persistenceAdapter.rollbackTransaction(ctx);
151 throw e;
152 }
153 persistenceAdapter.commitTransaction(ctx);
154 }
155 }
156 }
157
158 @Override
159 public void recover(TransactionRecoveryListener listener) throws IOException {
160 ((JDBCPersistenceAdapter)persistenceAdapter).recover(this);
161 super.recover(listener);
162 }
163
164 public void recoverAdd(long id, byte[] messageBytes) throws IOException {
165 final Message message = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(messageBytes));
166 message.getMessageId().setDataLocator(id);
167 Tx tx = getPreparedTx(message.getTransactionId());
168 tx.add(new AddMessageCommand() {
169 @Override
170 public Message getMessage() {
171 return message;
172 }
173
174 @Override
175 public MessageStore getMessageStore() {
176 return null;
177 }
178
179 @Override
180 public void run(ConnectionContext context) throws IOException {
181 ((JDBCPersistenceAdapter)persistenceAdapter).commitAdd(null, message.getMessageId());
182 }
183
184 });
185 }
186
187 public void recoverAck(long id, byte[] xid, byte[] message) throws IOException {
188 Message msg = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(message));
189 msg.getMessageId().setDataLocator(id);
190 Tx tx = getPreparedTx(new XATransactionId(xid));
191 final MessageAck ack = new MessageAck(msg, MessageAck.STANDARD_ACK_TYPE, 1);
192 tx.add(new RemoveMessageCommand() {
193 @Override
194 public MessageAck getMessageAck() {
195 return ack;
196 }
197
198 @Override
199 public void run(ConnectionContext context) throws IOException {
200 ((JDBCPersistenceAdapter)persistenceAdapter).commitRemove(context, ack);
201 }
202
203 @Override
204 public MessageStore getMessageStore() {
205 return null;
206 }
207
208 });
209
210 }
211
212 interface LastAckCommand extends RemoveMessageCommand {
213 void rollback(ConnectionContext context) throws IOException;
214
215 String getClientId();
216
217 String getSubName();
218
219 long getSequence();
220
221 byte getPriority();
222
223 void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore);
224 }
225
226 public void recoverLastAck(byte[] encodedXid, final ActiveMQDestination destination, final String subName, final String clientId) throws IOException {
227 Tx tx = getPreparedTx(new XATransactionId(encodedXid));
228 DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXid);
229 inputStream.skipBytes(1); // +|-
230 final long lastAck = inputStream.readLong();
231 final byte priority = inputStream.readByte();
232 final MessageAck ack = new MessageAck();
233 ack.setDestination(destination);
234 tx.add(new LastAckCommand() {
235 JDBCTopicMessageStore jdbcTopicMessageStore;
236
237 @Override
238 public MessageAck getMessageAck() {
239 return ack;
240 }
241
242 @Override
243 public MessageStore getMessageStore() {
244 return jdbcTopicMessageStore;
245 }
246
247 @Override
248 public void run(ConnectionContext context) throws IOException {
249 ((JDBCPersistenceAdapter)persistenceAdapter).commitLastAck(context, lastAck, priority, destination, subName, clientId);
250 jdbcTopicMessageStore.complete(clientId, subName);
251 }
252
253 @Override
254 public void rollback(ConnectionContext context) throws IOException {
255 ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, priority, jdbcTopicMessageStore.getDestination(), subName, clientId);
256 jdbcTopicMessageStore.complete(clientId, subName);
257 }
258
259 @Override
260 public String getClientId() {
261 return clientId;
262 }
263
264 @Override
265 public String getSubName() {
266 return subName;
267 }
268
269 @Override
270 public long getSequence() {
271 return lastAck;
272 }
273
274 @Override
275 public byte getPriority() {
276 return priority;
277 }
278
279 @Override
280 public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
281 this.jdbcTopicMessageStore = jdbcTopicMessageStore;
282 }
283 });
284
285 }
286
287 @Override
288 protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
289 topicStores.put(proxyTopicMessageStore.getDestination(), proxyTopicMessageStore.getDelegate());
290 }
291
292 @Override
293 protected void onRecovered(Tx tx) {
294 for (RemoveMessageCommand removeMessageCommand: tx.acks) {
295 if (removeMessageCommand instanceof LastAckCommand) {
296 LastAckCommand lastAckCommand = (LastAckCommand) removeMessageCommand;
297 JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) topicStores.get(lastAckCommand.getMessageAck().getDestination());
298 jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority());
299 lastAckCommand.setMessageStore(jdbcTopicMessageStore);
300 } else {
301 // when reading the store we ignore messages with non null XIDs but should include those with XIDS starting in - (pending acks in an xa transaction),
302 // but the sql is non portable to match BLOB with LIKE etc
303 // so we make up for it when we recover the ack
304 ((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment();
305 }
306 }
307 }
308
309 @Override
310 public void acknowledge(final TopicMessageStore topicMessageStore, final String clientId, final String subscriptionName,
311 final MessageId messageId, final MessageAck ack) throws IOException {
312
313 if (ack.isInTransaction()) {
314 Tx tx = getTx(ack.getTransactionId());
315 tx.add(new LastAckCommand() {
316 public MessageAck getMessageAck() {
317 return ack;
318 }
319
320 public void run(ConnectionContext ctx) throws IOException {
321 topicMessageStore.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
322 }
323
324 @Override
325 public MessageStore getMessageStore() {
326 return topicMessageStore;
327 }
328
329 @Override
330 public void rollback(ConnectionContext context) throws IOException {
331 JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore)topicMessageStore;
332 ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context,
333 jdbcTopicMessageStore,
334 ack,
335 subscriptionName, clientId);
336 jdbcTopicMessageStore.complete(clientId, subscriptionName);
337 }
338
339
340 @Override
341 public String getClientId() {
342 return clientId;
343 }
344
345 @Override
346 public String getSubName() {
347 return subscriptionName;
348 }
349
350 @Override
351 public long getSequence() {
352 throw new IllegalStateException("Sequence id must be inferred from ack");
353 }
354
355 @Override
356 public byte getPriority() {
357 throw new IllegalStateException("Priority must be inferred from ack or row");
358 }
359
360 @Override
361 public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
362 throw new IllegalStateException("message store already known!");
363 }
364 });
365 } else {
366 topicMessageStore.acknowledge(null, clientId, subscriptionName, messageId, ack);
367 }
368 }
369
370 }