001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc.adapter;
018
019 import java.io.IOException;
020 import java.sql.PreparedStatement;
021 import java.sql.ResultSet;
022 import java.sql.SQLException;
023 import java.sql.Statement;
024 import java.util.ArrayList;
025 import java.util.HashSet;
026 import java.util.LinkedList;
027 import java.util.Set;
028 import java.util.concurrent.locks.ReadWriteLock;
029 import java.util.concurrent.locks.ReentrantReadWriteLock;
030
031 import org.apache.activemq.broker.region.BaseDestination;
032 import org.apache.activemq.command.ActiveMQDestination;
033 import org.apache.activemq.command.MessageId;
034 import org.apache.activemq.command.ProducerId;
035 import org.apache.activemq.command.SubscriptionInfo;
036 import org.apache.activemq.command.XATransactionId;
037 import org.apache.activemq.store.jdbc.JDBCAdapter;
038 import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
039 import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
040 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
041 import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
042 import org.apache.activemq.store.jdbc.Statements;
043 import org.apache.activemq.store.jdbc.TransactionContext;
044 import org.apache.activemq.util.DataByteArrayOutputStream;
045 import org.slf4j.Logger;
046 import org.slf4j.LoggerFactory;
047
048 /**
049 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
050 * encouraged to override the default implementation of methods to account for differences in JDBC Driver
051 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
052 * The databases/JDBC drivers that use this adapter are:
053 * <ul>
054 * <li></li>
055 * </ul>
056 *
057 * @org.apache.xbean.XBean element="defaultJDBCAdapter"
058 *
059 *
060 */
061 public class DefaultJDBCAdapter implements JDBCAdapter {
062 private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
063 public static final int MAX_ROWS = BaseDestination.MAX_PAGE_SIZE;
064 protected Statements statements;
065 protected boolean batchStatments = true;
066 protected boolean prioritizedMessages;
067 protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
068 // needs to be min twice the prefetch for a durable sub and large enough for selector range
069 protected int maxRows = MAX_ROWS;
070
071 protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
072 s.setBytes(index, data);
073 }
074
075 protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
076 return rs.getBytes(index);
077 }
078
079 public void doCreateTables(TransactionContext c) throws SQLException, IOException {
080 Statement s = null;
081 cleanupExclusiveLock.writeLock().lock();
082 try {
083 // Check to see if the table already exists. If it does, then don't
084 // log warnings during startup.
085 // Need to run the scripts anyways since they may contain ALTER
086 // statements that upgrade a previous version
087 // of the table
088 boolean alreadyExists = false;
089 ResultSet rs = null;
090 try {
091 rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
092 new String[] { "TABLE" });
093 alreadyExists = rs.next();
094 } catch (Throwable ignore) {
095 } finally {
096 close(rs);
097 }
098 s = c.getConnection().createStatement();
099 String[] createStatments = this.statements.getCreateSchemaStatements();
100 for (int i = 0; i < createStatments.length; i++) {
101 // This will fail usually since the tables will be
102 // created already.
103 try {
104 LOG.debug("Executing SQL: " + createStatments[i]);
105 s.execute(createStatments[i]);
106 } catch (SQLException e) {
107 if (alreadyExists) {
108 LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
109 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
110 + " Vendor code: " + e.getErrorCode());
111 } else {
112 LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
113 + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
114 + " Vendor code: " + e.getErrorCode());
115 JDBCPersistenceAdapter.log("Failure details: ", e);
116 }
117 }
118 }
119 c.getConnection().commit();
120 } finally {
121 cleanupExclusiveLock.writeLock().unlock();
122 try {
123 s.close();
124 } catch (Throwable e) {
125 }
126 }
127 }
128
129 public void doDropTables(TransactionContext c) throws SQLException, IOException {
130 Statement s = null;
131 cleanupExclusiveLock.writeLock().lock();
132 try {
133 s = c.getConnection().createStatement();
134 String[] dropStatments = this.statements.getDropSchemaStatements();
135 for (int i = 0; i < dropStatments.length; i++) {
136 // This will fail usually since the tables will be
137 // created already.
138 try {
139 LOG.debug("Executing SQL: " + dropStatments[i]);
140 s.execute(dropStatments[i]);
141 } catch (SQLException e) {
142 LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
143 + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
144 + e.getErrorCode());
145 JDBCPersistenceAdapter.log("Failure details: ", e);
146 }
147 }
148 c.getConnection().commit();
149 } finally {
150 cleanupExclusiveLock.writeLock().unlock();
151 try {
152 s.close();
153 } catch (Throwable e) {
154 }
155 }
156 }
157
158 public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
159 PreparedStatement s = null;
160 ResultSet rs = null;
161 cleanupExclusiveLock.readLock().lock();
162 try {
163 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
164 rs = s.executeQuery();
165 long seq1 = 0;
166 if (rs.next()) {
167 seq1 = rs.getLong(1);
168 }
169 rs.close();
170 s.close();
171 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
172 rs = s.executeQuery();
173 long seq2 = 0;
174 if (rs.next()) {
175 seq2 = rs.getLong(1);
176 }
177 long seq = Math.max(seq1, seq2);
178 return seq;
179 } finally {
180 cleanupExclusiveLock.readLock().unlock();
181 close(rs);
182 close(s);
183 }
184 }
185
186 public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
187 PreparedStatement s = null;
188 ResultSet rs = null;
189 cleanupExclusiveLock.readLock().lock();
190 try {
191 s = c.getConnection().prepareStatement(
192 this.statements.getFindMessageByIdStatement());
193 s.setLong(1, storeSequenceId);
194 rs = s.executeQuery();
195 if (!rs.next()) {
196 return null;
197 }
198 return getBinaryData(rs, 1);
199 } finally {
200 cleanupExclusiveLock.readLock().unlock();
201 close(rs);
202 close(s);
203 }
204 }
205
206
207 /**
208 * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
209 */
210 public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
211 long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
212 PreparedStatement s = c.getAddMessageStatement();
213 cleanupExclusiveLock.readLock().lock();
214 try {
215 if (s == null) {
216 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
217 if (this.batchStatments) {
218 c.setAddMessageStatement(s);
219 }
220 }
221 s.setLong(1, sequence);
222 s.setString(2, messageID.getProducerId().toString());
223 s.setLong(3, messageID.getProducerSequenceId());
224 s.setString(4, destination.getQualifiedName());
225 s.setLong(5, expiration);
226 s.setLong(6, priority);
227 setBinaryData(s, 7, data);
228 if (xid != null) {
229 byte[] xidVal = xid.getEncodedXidBytes();
230 xidVal[0] = '+';
231 setBinaryData(s, 8, xidVal);
232 } else {
233 setBinaryData(s, 8, null);
234 }
235 if (this.batchStatments) {
236 s.addBatch();
237 } else if (s.executeUpdate() != 1) {
238 throw new SQLException("Failed add a message");
239 }
240 } finally {
241 cleanupExclusiveLock.readLock().unlock();
242 if (!this.batchStatments) {
243 if (s != null) {
244 s.close();
245 }
246 }
247 }
248 }
249
250 public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
251 long expirationTime, String messageRef) throws SQLException, IOException {
252 PreparedStatement s = c.getAddMessageStatement();
253 cleanupExclusiveLock.readLock().lock();
254 try {
255 if (s == null) {
256 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
257 if (this.batchStatments) {
258 c.setAddMessageStatement(s);
259 }
260 }
261 s.setLong(1, messageID.getBrokerSequenceId());
262 s.setString(2, messageID.getProducerId().toString());
263 s.setLong(3, messageID.getProducerSequenceId());
264 s.setString(4, destination.getQualifiedName());
265 s.setLong(5, expirationTime);
266 s.setString(6, messageRef);
267 if (this.batchStatments) {
268 s.addBatch();
269 } else if (s.executeUpdate() != 1) {
270 throw new SQLException("Failed add a message");
271 }
272 } finally {
273 cleanupExclusiveLock.readLock().unlock();
274 if (!this.batchStatments) {
275 s.close();
276 }
277 }
278 }
279
280 public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
281 PreparedStatement s = null;
282 ResultSet rs = null;
283 cleanupExclusiveLock.readLock().lock();
284 try {
285 s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
286 s.setString(1, messageID.getProducerId().toString());
287 s.setLong(2, messageID.getProducerSequenceId());
288 s.setString(3, destination.getQualifiedName());
289 rs = s.executeQuery();
290 if (!rs.next()) {
291 return new long[]{0,0};
292 }
293 return new long[]{rs.getLong(1), rs.getLong(2)};
294 } finally {
295 cleanupExclusiveLock.readLock().unlock();
296 close(rs);
297 close(s);
298 }
299 }
300
301 public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
302 PreparedStatement s = null;
303 ResultSet rs = null;
304 cleanupExclusiveLock.readLock().lock();
305 try {
306 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
307 s.setString(1, id.getProducerId().toString());
308 s.setLong(2, id.getProducerSequenceId());
309 rs = s.executeQuery();
310 if (!rs.next()) {
311 return null;
312 }
313 return getBinaryData(rs, 1);
314 } finally {
315 cleanupExclusiveLock.readLock().unlock();
316 close(rs);
317 close(s);
318 }
319 }
320
321 public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
322 PreparedStatement s = null;
323 ResultSet rs = null;
324 cleanupExclusiveLock.readLock().lock();
325 try {
326 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
327 s.setLong(1, seq);
328 rs = s.executeQuery();
329 if (!rs.next()) {
330 return null;
331 }
332 return rs.getString(1);
333 } finally {
334 cleanupExclusiveLock.readLock().unlock();
335 close(rs);
336 close(s);
337 }
338 }
339
340 /**
341 * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
342 */
343 public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException {
344 PreparedStatement s = c.getRemovedMessageStatement();
345 cleanupExclusiveLock.readLock().lock();
346 try {
347 if (s == null) {
348 s = c.getConnection().prepareStatement(xid == null ?
349 this.statements.getRemoveMessageStatement() : this.statements.getUpdateXidFlagStatement());
350 if (this.batchStatments) {
351 c.setRemovedMessageStatement(s);
352 }
353 }
354 if (xid == null) {
355 s.setLong(1, seq);
356 } else {
357 byte[] xidVal = xid.getEncodedXidBytes();
358 xidVal[0] = '-';
359 setBinaryData(s, 1, xidVal);
360 s.setLong(2, seq);
361 }
362 if (this.batchStatments) {
363 s.addBatch();
364 } else if (s.executeUpdate() != 1) {
365 throw new SQLException("Failed to remove message");
366 }
367 } finally {
368 cleanupExclusiveLock.readLock().unlock();
369 if (!this.batchStatments && s != null) {
370 s.close();
371 }
372 }
373 }
374
375 public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
376 throws Exception {
377 PreparedStatement s = null;
378 ResultSet rs = null;
379 cleanupExclusiveLock.readLock().lock();
380 try {
381 s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
382 s.setString(1, destination.getQualifiedName());
383 rs = s.executeQuery();
384 if (this.statements.isUseExternalMessageReferences()) {
385 while (rs.next()) {
386 if (!listener.recoverMessageReference(rs.getString(2))) {
387 break;
388 }
389 }
390 } else {
391 while (rs.next()) {
392 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
393 break;
394 }
395 }
396 }
397 } finally {
398 cleanupExclusiveLock.readLock().unlock();
399 close(rs);
400 close(s);
401 }
402 }
403
404 public void doMessageIdScan(TransactionContext c, int limit,
405 JDBCMessageIdScanListener listener) throws SQLException, IOException {
406 PreparedStatement s = null;
407 ResultSet rs = null;
408 cleanupExclusiveLock.readLock().lock();
409 try {
410 s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
411 s.setMaxRows(limit);
412 rs = s.executeQuery();
413 // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
414 LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
415 while (rs.next()) {
416 reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
417 }
418 if (LOG.isDebugEnabled()) {
419 LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
420 }
421 for (MessageId id : reverseOrderIds) {
422 listener.messageId(id);
423 }
424 } finally {
425 cleanupExclusiveLock.readLock().unlock();
426 close(rs);
427 close(s);
428 }
429 }
430
431 public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
432 String subscriptionName, long seq, long priority) throws SQLException, IOException {
433 PreparedStatement s = c.getUpdateLastAckStatement();
434 cleanupExclusiveLock.readLock().lock();
435 try {
436 if (s == null) {
437 s = c.getConnection().prepareStatement(xid == null ?
438 this.statements.getUpdateDurableLastAckWithPriorityStatement() :
439 this.statements.getUpdateDurableLastAckWithPriorityInTxStatement());
440 if (this.batchStatments) {
441 c.setUpdateLastAckStatement(s);
442 }
443 }
444 if (xid != null) {
445 byte[] xidVal = encodeXid(xid, seq, priority);
446 setBinaryData(s, 1, xidVal);
447 } else {
448 s.setLong(1, seq);
449 }
450 s.setString(2, destination.getQualifiedName());
451 s.setString(3, clientId);
452 s.setString(4, subscriptionName);
453 s.setLong(5, priority);
454 if (this.batchStatments) {
455 s.addBatch();
456 } else if (s.executeUpdate() != 1) {
457 throw new SQLException("Failed update last ack with priority: " + priority + ", for sub: " + subscriptionName);
458 }
459 } finally {
460 cleanupExclusiveLock.readLock().unlock();
461 if (!this.batchStatments) {
462 close(s);
463 }
464 }
465 }
466
467
468 public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
469 String subscriptionName, long seq, long priority) throws SQLException, IOException {
470 PreparedStatement s = c.getUpdateLastAckStatement();
471 cleanupExclusiveLock.readLock().lock();
472 try {
473 if (s == null) {
474 s = c.getConnection().prepareStatement(xid == null ?
475 this.statements.getUpdateDurableLastAckStatement() :
476 this.statements.getUpdateDurableLastAckInTxStatement());
477 if (this.batchStatments) {
478 c.setUpdateLastAckStatement(s);
479 }
480 }
481 if (xid != null) {
482 byte[] xidVal = encodeXid(xid, seq, priority);
483 setBinaryData(s, 1, xidVal);
484 } else {
485 s.setLong(1, seq);
486 }
487 s.setString(2, destination.getQualifiedName());
488 s.setString(3, clientId);
489 s.setString(4, subscriptionName);
490
491 if (this.batchStatments) {
492 s.addBatch();
493 } else if (s.executeUpdate() != 1) {
494 throw new IOException("Could not update last ack seq : "
495 + seq + ", for sub: " + subscriptionName);
496 }
497 } finally {
498 cleanupExclusiveLock.readLock().unlock();
499 if (!this.batchStatments) {
500 close(s);
501 }
502 }
503 }
504
505 private byte[] encodeXid(XATransactionId xid, long seq, long priority) {
506 byte[] xidVal = xid.getEncodedXidBytes();
507 // encode the update
508 DataByteArrayOutputStream outputStream = xid.internalOutputStream();
509 outputStream.position(1);
510 outputStream.writeLong(seq);
511 outputStream.writeByte(Long.valueOf(priority).byteValue());
512 return xidVal;
513 }
514
515 @Override
516 public void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String clientId, String subName) throws SQLException, IOException {
517 PreparedStatement s = null;
518 cleanupExclusiveLock.readLock().lock();
519 try {
520 s = c.getConnection().prepareStatement(this.statements.getClearDurableLastAckInTxStatement());
521 s.setString(1, destination.getQualifiedName());
522 s.setString(2, clientId);
523 s.setString(3, subName);
524 s.setLong(4, priority);
525 if (s.executeUpdate() != 1) {
526 throw new IOException("Could not remove prepared transaction state from message ack for: " + clientId + ":" + subName);
527 }
528 } finally {
529 cleanupExclusiveLock.readLock().unlock();
530 close(s);
531 }
532 }
533
534 public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
535 String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
536 // dumpTables(c,
537 // destination.getQualifiedName(),clientId,subscriptionName);
538 PreparedStatement s = null;
539 ResultSet rs = null;
540 cleanupExclusiveLock.readLock().lock();
541 try {
542 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
543 s.setString(1, destination.getQualifiedName());
544 s.setString(2, clientId);
545 s.setString(3, subscriptionName);
546 rs = s.executeQuery();
547 if (this.statements.isUseExternalMessageReferences()) {
548 while (rs.next()) {
549 if (!listener.recoverMessageReference(rs.getString(2))) {
550 break;
551 }
552 }
553 } else {
554 while (rs.next()) {
555 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
556 break;
557 }
558 }
559 }
560 } finally {
561 cleanupExclusiveLock.readLock().unlock();
562 close(rs);
563 close(s);
564 }
565 }
566
567 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
568 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
569
570 PreparedStatement s = null;
571 ResultSet rs = null;
572 cleanupExclusiveLock.readLock().lock();
573 try {
574 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
575 s.setMaxRows(Math.max(maxReturned * 2, maxRows));
576 s.setString(1, destination.getQualifiedName());
577 s.setString(2, clientId);
578 s.setString(3, subscriptionName);
579 s.setLong(4, seq);
580 rs = s.executeQuery();
581 int count = 0;
582 if (this.statements.isUseExternalMessageReferences()) {
583 while (rs.next() && count < maxReturned) {
584 if (listener.recoverMessageReference(rs.getString(1))) {
585 count++;
586 }
587 }
588 } else {
589 while (rs.next() && count < maxReturned) {
590 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
591 count++;
592 }
593 }
594 }
595 } finally {
596 cleanupExclusiveLock.readLock().unlock();
597 close(rs);
598 close(s);
599 }
600 }
601
602 public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
603 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
604
605 PreparedStatement s = null;
606 ResultSet rs = null;
607 cleanupExclusiveLock.readLock().lock();
608 try {
609 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
610 s.setMaxRows(Math.max(maxReturned * 2, maxRows));
611 s.setString(1, destination.getQualifiedName());
612 s.setString(2, clientId);
613 s.setString(3, subscriptionName);
614 s.setLong(4, seq);
615 s.setLong(5, priority);
616 rs = s.executeQuery();
617 int count = 0;
618 if (this.statements.isUseExternalMessageReferences()) {
619 while (rs.next() && count < maxReturned) {
620 if (listener.recoverMessageReference(rs.getString(1))) {
621 count++;
622 }
623 }
624 } else {
625 while (rs.next() && count < maxReturned) {
626 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
627 count++;
628 }
629 }
630 }
631 } finally {
632 cleanupExclusiveLock.readLock().unlock();
633 close(rs);
634 close(s);
635 }
636 }
637
638 public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
639 String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
640 PreparedStatement s = null;
641 ResultSet rs = null;
642 int result = 0;
643 cleanupExclusiveLock.readLock().lock();
644 try {
645 if (isPrioritizedMessages) {
646 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
647 } else {
648 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
649 }
650 s.setString(1, destination.getQualifiedName());
651 s.setString(2, clientId);
652 s.setString(3, subscriptionName);
653 rs = s.executeQuery();
654 if (rs.next()) {
655 result = rs.getInt(1);
656 }
657 } finally {
658 cleanupExclusiveLock.readLock().unlock();
659 close(rs);
660 close(s);
661 }
662 return result;
663 }
664
665 /**
666 * @param c
667 * @param info
668 * @param retroactive
669 * @throws SQLException
670 * @throws IOException
671 */
672 public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
673 throws SQLException, IOException {
674 // dumpTables(c, destination.getQualifiedName(), clientId,
675 // subscriptionName);
676 PreparedStatement s = null;
677 cleanupExclusiveLock.readLock().lock();
678 try {
679 long lastMessageId = -1;
680 if (!retroactive) {
681 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
682 ResultSet rs = null;
683 try {
684 rs = s.executeQuery();
685 if (rs.next()) {
686 lastMessageId = rs.getLong(1);
687 }
688 } finally {
689 close(rs);
690 close(s);
691 }
692 }
693 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
694 int maxPriority = 1;
695 if (isPrioritizedMessages) {
696 maxPriority = 10;
697 }
698
699 for (int priority = 0; priority < maxPriority; priority++) {
700 s.setString(1, info.getDestination().getQualifiedName());
701 s.setString(2, info.getClientId());
702 s.setString(3, info.getSubscriptionName());
703 s.setString(4, info.getSelector());
704 s.setLong(5, lastMessageId);
705 s.setString(6, info.getSubscribedDestination().getQualifiedName());
706 s.setLong(7, priority);
707
708 if (s.executeUpdate() != 1) {
709 throw new IOException("Could not create durable subscription for: " + info.getClientId());
710 }
711 }
712
713 } finally {
714 cleanupExclusiveLock.readLock().unlock();
715 close(s);
716 }
717 }
718
719 public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
720 String clientId, String subscriptionName) throws SQLException, IOException {
721 PreparedStatement s = null;
722 ResultSet rs = null;
723 cleanupExclusiveLock.readLock().lock();
724 try {
725 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
726 s.setString(1, destination.getQualifiedName());
727 s.setString(2, clientId);
728 s.setString(3, subscriptionName);
729 rs = s.executeQuery();
730 if (!rs.next()) {
731 return null;
732 }
733 SubscriptionInfo subscription = new SubscriptionInfo();
734 subscription.setDestination(destination);
735 subscription.setClientId(clientId);
736 subscription.setSubscriptionName(subscriptionName);
737 subscription.setSelector(rs.getString(1));
738 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
739 ActiveMQDestination.QUEUE_TYPE));
740 return subscription;
741 } finally {
742 cleanupExclusiveLock.readLock().unlock();
743 close(rs);
744 close(s);
745 }
746 }
747
748 public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
749 throws SQLException, IOException {
750 PreparedStatement s = null;
751 ResultSet rs = null;
752 cleanupExclusiveLock.readLock().lock();
753 try {
754 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
755 s.setString(1, destination.getQualifiedName());
756 rs = s.executeQuery();
757 ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
758 while (rs.next()) {
759 SubscriptionInfo subscription = new SubscriptionInfo();
760 subscription.setDestination(destination);
761 subscription.setSelector(rs.getString(1));
762 subscription.setSubscriptionName(rs.getString(2));
763 subscription.setClientId(rs.getString(3));
764 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
765 ActiveMQDestination.QUEUE_TYPE));
766 rc.add(subscription);
767 }
768 return rc.toArray(new SubscriptionInfo[rc.size()]);
769 } finally {
770 cleanupExclusiveLock.readLock().unlock();
771 close(rs);
772 close(s);
773 }
774 }
775
776 public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
777 IOException {
778 PreparedStatement s = null;
779 cleanupExclusiveLock.readLock().lock();
780 try {
781 s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
782 s.setString(1, destinationName.getQualifiedName());
783 s.executeUpdate();
784 s.close();
785 s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
786 s.setString(1, destinationName.getQualifiedName());
787 s.executeUpdate();
788 } finally {
789 cleanupExclusiveLock.readLock().unlock();
790 close(s);
791 }
792 }
793
794 public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
795 String subscriptionName) throws SQLException, IOException {
796 PreparedStatement s = null;
797 cleanupExclusiveLock.readLock().lock();
798 try {
799 s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
800 s.setString(1, destination.getQualifiedName());
801 s.setString(2, clientId);
802 s.setString(3, subscriptionName);
803 s.executeUpdate();
804 } finally {
805 cleanupExclusiveLock.readLock().unlock();
806 close(s);
807 }
808 }
809
810 char priorityIterator = 0; // unsigned
811 public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
812 PreparedStatement s = null;
813 cleanupExclusiveLock.writeLock().lock();
814 try {
815 LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
816 s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
817 int priority = priorityIterator++%10;
818 s.setInt(1, priority);
819 s.setInt(2, priority);
820 int i = s.executeUpdate();
821 LOG.debug("Deleted " + i + " old message(s) at priority: " + priority);
822 } finally {
823 cleanupExclusiveLock.writeLock().unlock();
824 close(s);
825 }
826 }
827
828 public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
829 String clientId, String subscriberName) throws SQLException, IOException {
830 PreparedStatement s = null;
831 ResultSet rs = null;
832 long result = -1;
833 cleanupExclusiveLock.readLock().lock();
834 try {
835 s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
836 s.setString(1, destination.getQualifiedName());
837 s.setString(2, clientId);
838 s.setString(3, subscriberName);
839 rs = s.executeQuery();
840 if (rs.next()) {
841 result = rs.getLong(1);
842 if (result == 0 && rs.wasNull()) {
843 result = -1;
844 }
845 }
846 } finally {
847 cleanupExclusiveLock.readLock().unlock();
848 close(rs);
849 close(s);
850 }
851 return result;
852 }
853
854 protected static void close(PreparedStatement s) {
855 try {
856 s.close();
857 } catch (Throwable e) {
858 }
859 }
860
861 protected static void close(ResultSet rs) {
862 try {
863 rs.close();
864 } catch (Throwable e) {
865 }
866 }
867
868 public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
869 HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
870 PreparedStatement s = null;
871 ResultSet rs = null;
872 cleanupExclusiveLock.readLock().lock();
873 try {
874 s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
875 rs = s.executeQuery();
876 while (rs.next()) {
877 rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
878 }
879 } finally {
880 cleanupExclusiveLock.readLock().unlock();
881 close(rs);
882 close(s);
883 }
884 return rc;
885 }
886
887 /**
888 * @return true if batchStements
889 */
890 public boolean isBatchStatments() {
891 return this.batchStatments;
892 }
893
894 /**
895 * @param batchStatments
896 */
897 public void setBatchStatments(boolean batchStatments) {
898 this.batchStatments = batchStatments;
899 }
900
901 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
902 this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
903 }
904
905 /**
906 * @return the statements
907 */
908 public Statements getStatements() {
909 return this.statements;
910 }
911
912 public void setStatements(Statements statements) {
913 this.statements = statements;
914 }
915
916 public int getMaxRows() {
917 return maxRows;
918 }
919
920 public void setMaxRows(int maxRows) {
921 this.maxRows = maxRows;
922 }
923
924 @Override
925 public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
926 PreparedStatement s = null;
927 cleanupExclusiveLock.readLock().lock();
928 try {
929 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
930 s.setString(1, destination.getQualifiedName());
931 s.setString(2, destination.getQualifiedName());
932 s.setString(3, destination.getQualifiedName());
933 s.setString(4, null);
934 s.setLong(5, 0);
935 s.setString(6, destination.getQualifiedName());
936 s.setLong(7, 11); // entry out of priority range
937
938 if (s.executeUpdate() != 1) {
939 throw new IOException("Could not create ack record for destination: " + destination);
940 }
941 } finally {
942 cleanupExclusiveLock.readLock().unlock();
943 close(s);
944 }
945 }
946
947 @Override
948 public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException {
949 PreparedStatement s = null;
950 ResultSet rs = null;
951 cleanupExclusiveLock.readLock().lock();
952 try {
953 s = c.getConnection().prepareStatement(this.statements.getFindOpsPendingOutcomeStatement());
954 rs = s.executeQuery();
955 while (rs.next()) {
956 long id = rs.getLong(1);
957 byte[] encodedXid = getBinaryData(rs, 2);
958 if (encodedXid[0] == '+') {
959 jdbcMemoryTransactionStore.recoverAdd(id, getBinaryData(rs, 3));
960 } else {
961 jdbcMemoryTransactionStore.recoverAck(id, encodedXid, getBinaryData(rs, 3));
962 }
963 }
964
965 close(rs);
966 close(s);
967
968 s = c.getConnection().prepareStatement(this.statements.getFindAcksPendingOutcomeStatement());
969 rs = s.executeQuery();
970 while (rs.next()) {
971 byte[] encodedXid = getBinaryData(rs, 1);
972 String destination = rs.getString(2);
973 String subName = rs.getString(3);
974 String subId = rs.getString(4);
975 jdbcMemoryTransactionStore.recoverLastAck(encodedXid,
976 ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE),
977 subName, subId);
978 }
979 } finally {
980 close(rs);
981 cleanupExclusiveLock.readLock().unlock();
982 close(s);
983 }
984 }
985
986 @Override
987 public void doCommitAddOp(TransactionContext c, long sequence) throws SQLException, IOException {
988 PreparedStatement s = null;
989 cleanupExclusiveLock.readLock().lock();
990 try {
991 s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement());
992 s.setLong(1, sequence);
993 if (s.executeUpdate() != 1) {
994 throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence);
995 }
996 } finally {
997 cleanupExclusiveLock.readLock().unlock();
998 close(s);
999 }
1000 }
1001
1002
1003 public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
1004 IOException {
1005 PreparedStatement s = null;
1006 ResultSet rs = null;
1007 int result = 0;
1008 cleanupExclusiveLock.readLock().lock();
1009 try {
1010 s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
1011 s.setString(1, destination.getQualifiedName());
1012 rs = s.executeQuery();
1013 if (rs.next()) {
1014 result = rs.getInt(1);
1015 }
1016 } finally {
1017 cleanupExclusiveLock.readLock().unlock();
1018 close(rs);
1019 close(s);
1020 }
1021 return result;
1022 }
1023
1024 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
1025 long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
1026 PreparedStatement s = null;
1027 ResultSet rs = null;
1028 cleanupExclusiveLock.readLock().lock();
1029 try {
1030 if (isPrioritizedMessages) {
1031 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
1032 } else {
1033 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
1034 }
1035 s.setMaxRows(Math.max(maxReturned * 2, maxRows));
1036 s.setString(1, destination.getQualifiedName());
1037 s.setLong(2, nextSeq);
1038 if (isPrioritizedMessages) {
1039 s.setLong(3, priority);
1040 s.setLong(4, priority);
1041 }
1042 rs = s.executeQuery();
1043 int count = 0;
1044 if (this.statements.isUseExternalMessageReferences()) {
1045 while (rs.next() && count < maxReturned) {
1046 if (listener.recoverMessageReference(rs.getString(1))) {
1047 count++;
1048 } else {
1049 LOG.debug("Stopped recover next messages");
1050 break;
1051 }
1052 }
1053 } else {
1054 while (rs.next() && count < maxReturned) {
1055 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
1056 count++;
1057 } else {
1058 LOG.debug("Stopped recover next messages");
1059 break;
1060 }
1061 }
1062 }
1063 } catch (Exception e) {
1064 e.printStackTrace();
1065 } finally {
1066 cleanupExclusiveLock.readLock().unlock();
1067 close(rs);
1068 close(s);
1069 }
1070 }
1071
1072 public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
1073 throws SQLException, IOException {
1074 PreparedStatement s = null;
1075 ResultSet rs = null;
1076 cleanupExclusiveLock.readLock().lock();
1077 try {
1078 s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
1079 s.setString(1, id.toString());
1080 rs = s.executeQuery();
1081 long seq = -1;
1082 if (rs.next()) {
1083 seq = rs.getLong(1);
1084 }
1085 return seq;
1086 } finally {
1087 cleanupExclusiveLock.readLock().unlock();
1088 close(rs);
1089 close(s);
1090 }
1091 }
1092
1093 /* public static void dumpTables(Connection c, String destinationName, String clientId, String
1094 subscriptionName) throws SQLException {
1095 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
1096 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
1097 PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM "
1098 + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D "
1099 + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
1100 + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
1101 + " ORDER BY M.ID");
1102 s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
1103 printQuery(s,System.out); }
1104
1105 public static void dumpTables(java.sql.Connection c) throws SQLException {
1106 printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out);
1107 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
1108 }
1109
1110 private static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
1111 throws SQLException {
1112 printQuery(c.prepareStatement(query), out);
1113 }
1114
1115 private static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
1116 throws SQLException {
1117
1118 ResultSet set = null;
1119 try {
1120 set = s.executeQuery();
1121 java.sql.ResultSetMetaData metaData = set.getMetaData();
1122 for (int i = 1; i <= metaData.getColumnCount(); i++) {
1123 if (i == 1)
1124 out.print("||");
1125 out.print(metaData.getColumnName(i) + "||");
1126 }
1127 out.println();
1128 while (set.next()) {
1129 for (int i = 1; i <= metaData.getColumnCount(); i++) {
1130 if (i == 1)
1131 out.print("|");
1132 out.print(set.getString(i) + "|");
1133 }
1134 out.println();
1135 }
1136 } finally {
1137 try {
1138 set.close();
1139 } catch (Throwable ignore) {
1140 }
1141 try {
1142 s.close();
1143 } catch (Throwable ignore) {
1144 }
1145 }
1146 } */
1147
1148 }