001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc.adapter;
018
019import static javax.xml.bind.DatatypeConverter.parseBase64Binary;
020import static javax.xml.bind.DatatypeConverter.printBase64Binary;
021
022import java.io.IOException;
023import java.sql.Connection;
024import java.sql.PreparedStatement;
025import java.sql.ResultSet;
026import java.sql.SQLException;
027import java.sql.Statement;
028import java.util.ArrayList;
029import java.util.HashSet;
030import java.util.LinkedList;
031import java.util.Set;
032import java.util.concurrent.locks.ReadWriteLock;
033import java.util.concurrent.locks.ReentrantReadWriteLock;
034
035import org.apache.activemq.command.ActiveMQDestination;
036import org.apache.activemq.command.MessageId;
037import org.apache.activemq.command.ProducerId;
038import org.apache.activemq.command.SubscriptionInfo;
039import org.apache.activemq.command.XATransactionId;
040import org.apache.activemq.store.jdbc.JDBCAdapter;
041import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
042import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
043import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
044import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
045import org.apache.activemq.store.jdbc.Statements;
046import org.apache.activemq.store.jdbc.TransactionContext;
047import org.apache.activemq.util.DataByteArrayOutputStream;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051/**
052 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
053 * encouraged to override the default implementation of methods to account for differences in JDBC Driver
054 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
055 * The databases/JDBC drivers that use this adapter are:
056 * <ul>
057 * <li></li>
058 * </ul>
059 *
060 * @org.apache.xbean.XBean element="defaultJDBCAdapter"
061 *
062 *
063 */
064public class DefaultJDBCAdapter implements JDBCAdapter {
065    private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
066    public static final int MAX_ROWS = org.apache.activemq.ActiveMQPrefetchPolicy.MAX_PREFETCH_SIZE;
067    private static final String FAILURE_MESSAGE = "Failure was: %s Message: %s SQLState: %s Vendor code: %s";
068    protected Statements statements;
069    private boolean batchStatements = true;
070    //This is deprecated and should be removed in a future release
071    protected boolean batchStatments = true;
072    protected boolean prioritizedMessages;
073    protected int maxRows = MAX_ROWS;
074
075    protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
076        s.setBytes(index, data);
077    }
078
079    protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
080        return rs.getBytes(index);
081    }
082
083    @Override
084    public void doCreateTables(TransactionContext transactionContext) throws SQLException, IOException {
085        // Check to see if the table already exists. If it does, then don't log warnings during startup.
086        // Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version of the table
087        boolean messageTableAlreadyExists = messageTableAlreadyExists(transactionContext);
088
089        for (String createStatement : this.statements.getCreateSchemaStatements()) {
090            // This will fail usually since the tables will be
091            // created already.
092            executeStatement(transactionContext, createStatement, messageTableAlreadyExists);
093        }
094    }
095
096    private boolean messageTableAlreadyExists(TransactionContext transactionContext) {
097        boolean alreadyExists = false;
098        ResultSet rs = null;
099        try {
100            rs = transactionContext.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(), new String[] { "TABLE" });
101            alreadyExists = rs.next();
102        } catch (Throwable ignore) {
103        } finally {
104            close(rs);
105        }
106        return alreadyExists;
107    }
108
109    private void executeStatement(TransactionContext transactionContext, String createStatement, boolean ignoreStatementExecutionFailure) throws IOException {
110        Statement statement = null;
111        try {
112            LOG.debug("Executing SQL: " + createStatement);
113            statement = transactionContext.getConnection().createStatement();
114            statement.execute(createStatement);
115
116            commitIfAutoCommitIsDisabled(transactionContext);
117        } catch (SQLException e) {
118            if (ignoreStatementExecutionFailure) {
119                LOG.debug("Could not create JDBC tables; The message table already existed. " + String.format(FAILURE_MESSAGE, createStatement, e.getMessage(), e.getSQLState(), e.getErrorCode()));
120            } else {
121                LOG.warn("Could not create JDBC tables; they could already exist. " + String.format(FAILURE_MESSAGE, createStatement, e.getMessage(), e.getSQLState(), e.getErrorCode()));
122                JDBCPersistenceAdapter.log("Failure details: ", e);
123            }
124        } finally {
125            closeStatement(statement);
126        }
127    }
128
129    private void closeStatement(Statement statement) {
130        try {
131            if (statement != null) {
132                statement.close();
133            }
134        } catch (SQLException ignored) {}
135    }
136
137    private void commitIfAutoCommitIsDisabled(TransactionContext c) throws SQLException, IOException {
138        if (!c.getConnection().getAutoCommit()) {
139            c.getConnection().commit();
140        }
141    }
142
143    @Override
144    public void doDropTables(TransactionContext c) throws SQLException, IOException {
145        Statement s = null;
146        try {
147            s = c.getConnection().createStatement();
148            String[] dropStatments = this.statements.getDropSchemaStatements();
149            for (int i = 0; i < dropStatments.length; i++) {
150                // This will fail usually since the tables will be
151                // created already.
152                try {
153                    LOG.debug("Executing SQL: " + dropStatments[i]);
154                    s.execute(dropStatments[i]);
155                } catch (SQLException e) {
156                    LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
157                            + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
158                            + e.getErrorCode());
159                    JDBCPersistenceAdapter.log("Failure details: ", e);
160                }
161            }
162            commitIfAutoCommitIsDisabled(c);
163        } finally {
164            try {
165                s.close();
166            } catch (Throwable e) {
167            }
168        }
169    }
170
171    @Override
172    public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
173        PreparedStatement s = null;
174        ResultSet rs = null;
175        try {
176            s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
177            rs = s.executeQuery();
178            long seq1 = 0;
179            if (rs.next()) {
180                seq1 = rs.getLong(1);
181            }
182            rs.close();
183            s.close();
184            s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
185            rs = s.executeQuery();
186            long seq2 = 0;
187            if (rs.next()) {
188                seq2 = rs.getLong(1);
189            }
190            long seq = Math.max(seq1, seq2);
191            return seq;
192        } finally {
193            close(rs);
194            close(s);
195        }
196    }
197
198    @Override
199    public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
200        PreparedStatement s = null;
201        ResultSet rs = null;
202        try {
203            s = c.getConnection().prepareStatement(
204                    this.statements.getFindMessageByIdStatement());
205            s.setLong(1, storeSequenceId);
206            rs = s.executeQuery();
207            if (!rs.next()) {
208                return null;
209            }
210            return getBinaryData(rs, 1);
211        } finally {
212            close(rs);
213            close(s);
214        }
215    }
216
217
218    /**
219     * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
220     */
221    @Override
222    public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
223                             long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
224        PreparedStatement s = c.getAddMessageStatement();
225        try {
226            if (s == null) {
227                s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
228                if (this.batchStatements) {
229                    c.setAddMessageStatement(s);
230                }
231            }
232            s.setLong(1, sequence);
233            s.setString(2, messageID.getProducerId().toString());
234            s.setLong(3, messageID.getProducerSequenceId());
235            s.setString(4, destination.getQualifiedName());
236            s.setLong(5, expiration);
237            s.setLong(6, priority);
238            setBinaryData(s, 7, data);
239            if (xid != null) {
240                byte[] xidVal = xid.getEncodedXidBytes();
241                xidVal[0] = '+';
242                String xidString = printBase64Binary(xidVal);
243                s.setString(8, xidString);
244            } else {
245                s.setString(8, null);
246            }
247            if (this.batchStatements) {
248                s.addBatch();
249            } else if (s.executeUpdate() != 1) {
250                throw new SQLException("Failed add a message");
251            }
252        } finally {
253            if (!this.batchStatements) {
254                if (s != null) {
255                    s.close();
256                }
257            }
258        }
259    }
260
261    @Override
262    public void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException {
263        PreparedStatement s = null;
264        try {
265            s = c.getConnection().prepareStatement(this.statements.getUpdateMessageStatement());
266            setBinaryData(s, 1, data);
267            s.setString(2, id.getProducerId().toString());
268            s.setLong(3, id.getProducerSequenceId());
269            s.setString(4, destination.getQualifiedName());
270            if (s.executeUpdate() != 1) {
271                throw new IOException("Could not update message: " + id + " in " + destination);
272            }
273        } finally {
274            close(s);
275        }
276    }
277
278
279    @Override
280    public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
281            long expirationTime, String messageRef) throws SQLException, IOException {
282        PreparedStatement s = c.getAddMessageStatement();
283        try {
284            if (s == null) {
285                s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
286                if (this.batchStatements) {
287                    c.setAddMessageStatement(s);
288                }
289            }
290            s.setLong(1, messageID.getBrokerSequenceId());
291            s.setString(2, messageID.getProducerId().toString());
292            s.setLong(3, messageID.getProducerSequenceId());
293            s.setString(4, destination.getQualifiedName());
294            s.setLong(5, expirationTime);
295            s.setString(6, messageRef);
296            if (this.batchStatements) {
297                s.addBatch();
298            } else if (s.executeUpdate() != 1) {
299                throw new SQLException("Failed add a message");
300            }
301        } finally {
302            if (!this.batchStatements) {
303                s.close();
304            }
305        }
306    }
307
308    @Override
309    public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
310        PreparedStatement s = null;
311        ResultSet rs = null;
312        try {
313            s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
314            s.setString(1, messageID.getProducerId().toString());
315            s.setLong(2, messageID.getProducerSequenceId());
316            s.setString(3, destination.getQualifiedName());
317            rs = s.executeQuery();
318            if (!rs.next()) {
319                return new long[]{0,0};
320            }
321            return new long[]{rs.getLong(1), rs.getLong(2)};
322        } finally {
323            close(rs);
324            close(s);
325        }
326    }
327
328    @Override
329    public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
330        PreparedStatement s = null;
331        ResultSet rs = null;
332        try {
333            s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
334            s.setString(1, id.getProducerId().toString());
335            s.setLong(2, id.getProducerSequenceId());
336            rs = s.executeQuery();
337            if (!rs.next()) {
338                return null;
339            }
340            return getBinaryData(rs, 1);
341        } finally {
342            close(rs);
343            close(s);
344        }
345    }
346
347    @Override
348    public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
349        PreparedStatement s = null;
350        ResultSet rs = null;
351        try {
352            s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
353            s.setLong(1, seq);
354            rs = s.executeQuery();
355            if (!rs.next()) {
356                return null;
357            }
358            return rs.getString(1);
359        } finally {
360            close(rs);
361            close(s);
362        }
363    }
364
365    /**
366     * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
367     */
368    @Override
369    public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException {
370        PreparedStatement s = c.getRemovedMessageStatement();
371        try {
372            if (s == null) {
373                s = c.getConnection().prepareStatement(xid == null ?
374                        this.statements.getRemoveMessageStatement() : this.statements.getUpdateXidFlagStatement());
375                if (this.batchStatements) {
376                    c.setRemovedMessageStatement(s);
377                }
378            }
379            if (xid == null) {
380                s.setLong(1, seq);
381            } else {
382                byte[] xidVal = xid.getEncodedXidBytes();
383                xidVal[0] = '-';
384                String xidString = printBase64Binary(xidVal);
385                s.setString(1, xidString);
386                s.setLong(2, seq);
387            }
388            if (this.batchStatements) {
389                s.addBatch();
390            } else if (s.executeUpdate() != 1) {
391                throw new SQLException("Failed to remove message seq: " + seq);
392            }
393        } finally {
394            if (!this.batchStatements && s != null) {
395                s.close();
396            }
397        }
398    }
399
400    @Override
401    public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
402            throws Exception {
403        PreparedStatement s = null;
404        ResultSet rs = null;
405        try {
406            s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
407            s.setString(1, destination.getQualifiedName());
408            rs = s.executeQuery();
409            if (this.statements.isUseExternalMessageReferences()) {
410                while (rs.next()) {
411                    if (!listener.recoverMessageReference(rs.getString(2))) {
412                        break;
413                    }
414                }
415            } else {
416                while (rs.next()) {
417                    if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
418                        break;
419                    }
420                }
421            }
422        } finally {
423            close(rs);
424            close(s);
425        }
426    }
427
428    @Override
429    public void doMessageIdScan(TransactionContext c, int limit,
430            JDBCMessageIdScanListener listener) throws SQLException, IOException {
431        PreparedStatement s = null;
432        ResultSet rs = null;
433        try {
434            s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
435            s.setMaxRows(limit);
436            rs = s.executeQuery();
437            // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
438            LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
439            while (rs.next()) {
440                reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
441            }
442            if (LOG.isDebugEnabled()) {
443                LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
444            }
445            for (MessageId id : reverseOrderIds) {
446                listener.messageId(id);
447            }
448        } finally {
449            close(rs);
450            close(s);
451        }
452    }
453
454    @Override
455    public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
456                                         String subscriptionName, long seq, long priority) throws SQLException, IOException {
457        PreparedStatement s = c.getUpdateLastAckStatement();
458        try {
459            if (s == null) {
460                s = c.getConnection().prepareStatement(xid == null ?
461                        this.statements.getUpdateDurableLastAckWithPriorityStatement() :
462                        this.statements.getUpdateDurableLastAckWithPriorityInTxStatement());
463                if (this.batchStatements) {
464                    c.setUpdateLastAckStatement(s);
465                }
466            }
467            if (xid != null) {
468                byte[] xidVal = encodeXid(xid, seq, priority);
469                String xidString = printBase64Binary(xidVal);
470                s.setString(1, xidString);
471            } else {
472                s.setLong(1, seq);
473            }
474            s.setString(2, destination.getQualifiedName());
475            s.setString(3, clientId);
476            s.setString(4, subscriptionName);
477            s.setLong(5, priority);
478            if (this.batchStatements) {
479                s.addBatch();
480            } else if (s.executeUpdate() != 1) {
481                throw new SQLException("Failed update last ack with priority: " + priority + ", for sub: " + subscriptionName);
482            }
483        } finally {
484            if (!this.batchStatements) {
485                close(s);
486            }
487        }
488    }
489
490
491    @Override
492    public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
493                             String subscriptionName, long seq, long priority) throws SQLException, IOException {
494        PreparedStatement s = c.getUpdateLastAckStatement();
495        try {
496            if (s == null) {
497                s = c.getConnection().prepareStatement(xid == null ?
498                        this.statements.getUpdateDurableLastAckStatement() :
499                        this.statements.getUpdateDurableLastAckInTxStatement());
500                if (this.batchStatements) {
501                    c.setUpdateLastAckStatement(s);
502                }
503            }
504            if (xid != null) {
505                byte[] xidVal = encodeXid(xid, seq, priority);
506                String xidString = printBase64Binary(xidVal);
507                s.setString(1, xidString);
508            } else {
509                s.setLong(1, seq);
510            }
511            s.setString(2, destination.getQualifiedName());
512            s.setString(3, clientId);
513            s.setString(4, subscriptionName);
514
515            if (this.batchStatements) {
516                s.addBatch();
517            } else if (s.executeUpdate() != 1) {
518                throw new IOException("Could not update last ack seq : "
519                            + seq + ", for sub: " + subscriptionName);
520            }
521        } finally {
522            if (!this.batchStatements) {
523                close(s);
524            }
525        }
526    }
527
528    private byte[] encodeXid(XATransactionId xid, long seq, long priority) {
529        byte[] xidVal = xid.getEncodedXidBytes();
530        // encode the update
531        DataByteArrayOutputStream outputStream = xid.internalOutputStream();
532        outputStream.position(1);
533        outputStream.writeLong(seq);
534        outputStream.writeByte(Long.valueOf(priority).byteValue());
535        return xidVal;
536    }
537
538    @Override
539    public void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String clientId, String subName) throws SQLException, IOException {
540        PreparedStatement s = null;
541        try {
542            s = c.getConnection().prepareStatement(this.statements.getClearDurableLastAckInTxStatement());
543            s.setString(1, destination.getQualifiedName());
544            s.setString(2, clientId);
545            s.setString(3, subName);
546            s.setLong(4, priority);
547            if (s.executeUpdate() != 1) {
548                throw new IOException("Could not remove prepared transaction state from message ack for: " + clientId + ":" + subName);
549            }
550        } finally {
551            close(s);
552        }
553    }
554
555    @Override
556    public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
557            String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
558        // dumpTables(c,
559        // destination.getQualifiedName(),clientId,subscriptionName);
560        PreparedStatement s = null;
561        ResultSet rs = null;
562        try {
563            s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
564            s.setString(1, destination.getQualifiedName());
565            s.setString(2, clientId);
566            s.setString(3, subscriptionName);
567            rs = s.executeQuery();
568            if (this.statements.isUseExternalMessageReferences()) {
569                while (rs.next()) {
570                    if (!listener.recoverMessageReference(rs.getString(2))) {
571                        break;
572                    }
573                }
574            } else {
575                while (rs.next()) {
576                    if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
577                        break;
578                    }
579                }
580            }
581        } finally {
582            close(rs);
583            close(s);
584        }
585    }
586
587    @Override
588    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
589            String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
590
591        PreparedStatement s = null;
592        ResultSet rs = null;
593        try {
594            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
595            s.setMaxRows(Math.min(maxReturned * 2, maxRows));
596            s.setString(1, destination.getQualifiedName());
597            s.setString(2, clientId);
598            s.setString(3, subscriptionName);
599            s.setLong(4, seq);
600            rs = s.executeQuery();
601            int count = 0;
602            if (this.statements.isUseExternalMessageReferences()) {
603                while (rs.next() && count < maxReturned) {
604                    if (listener.recoverMessageReference(rs.getString(1))) {
605                        count++;
606                    }
607                }
608            } else {
609                while (rs.next() && count < maxReturned) {
610                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
611                        count++;
612                    }
613                }
614            }
615        } finally {
616            close(rs);
617            close(s);
618        }
619    }
620
621    @Override
622    public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
623            String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
624
625        PreparedStatement s = null;
626        ResultSet rs = null;
627        try {
628            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
629            s.setMaxRows(Math.min(maxReturned * 2, maxRows));
630            s.setString(1, destination.getQualifiedName());
631            s.setString(2, clientId);
632            s.setString(3, subscriptionName);
633            s.setLong(4, seq);
634            s.setLong(5, priority);
635            rs = s.executeQuery();
636            int count = 0;
637            if (this.statements.isUseExternalMessageReferences()) {
638                while (rs.next() && count < maxReturned) {
639                    if (listener.recoverMessageReference(rs.getString(1))) {
640                        count++;
641                    }
642                }
643            } else {
644                while (rs.next() && count < maxReturned) {
645                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
646                        count++;
647                    }
648                }
649            }
650        } finally {
651            close(rs);
652            close(s);
653        }
654    }
655
656    @Override
657    public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
658            String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
659        PreparedStatement s = null;
660        ResultSet rs = null;
661        int result = 0;
662        try {
663            if (isPrioritizedMessages) {
664                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
665            } else {
666                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
667            }
668            s.setString(1, destination.getQualifiedName());
669            s.setString(2, clientId);
670            s.setString(3, subscriptionName);
671            rs = s.executeQuery();
672            if (rs.next()) {
673                result = rs.getInt(1);
674            }
675        } finally {
676            close(rs);
677            close(s);
678        }
679        return result;
680    }
681
682    /**
683     * @param c
684     * @param info
685     * @param retroactive
686     * @throws SQLException
687     * @throws IOException
688     */
689    @Override
690    public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
691            throws SQLException, IOException {
692        // dumpTables(c, destination.getQualifiedName(), clientId,
693        // subscriptionName);
694        PreparedStatement s = null;
695        try {
696            long lastMessageId = -1;
697            if (!retroactive) {
698                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
699                ResultSet rs = null;
700                try {
701                    rs = s.executeQuery();
702                    if (rs.next()) {
703                        lastMessageId = rs.getLong(1);
704                    }
705                } finally {
706                    close(rs);
707                    close(s);
708                }
709            }
710            s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
711            int maxPriority = 1;
712            if (isPrioritizedMessages) {
713                maxPriority = 10;
714            }
715
716            for (int priority = 0; priority < maxPriority; priority++) {
717                s.setString(1, info.getDestination().getQualifiedName());
718                s.setString(2, info.getClientId());
719                s.setString(3, info.getSubscriptionName());
720                s.setString(4, info.getSelector());
721                s.setLong(5, lastMessageId);
722                s.setString(6, info.getSubscribedDestination().getQualifiedName());
723                s.setLong(7, priority);
724
725                if (s.executeUpdate() != 1) {
726                    throw new IOException("Could not create durable subscription for: " + info.getClientId());
727                }
728            }
729
730        } finally {
731            close(s);
732        }
733    }
734
735    @Override
736    public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
737            String clientId, String subscriptionName) throws SQLException, IOException {
738        PreparedStatement s = null;
739        ResultSet rs = null;
740        try {
741            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
742            s.setString(1, destination.getQualifiedName());
743            s.setString(2, clientId);
744            s.setString(3, subscriptionName);
745            rs = s.executeQuery();
746            if (!rs.next()) {
747                return null;
748            }
749            SubscriptionInfo subscription = new SubscriptionInfo();
750            subscription.setDestination(destination);
751            subscription.setClientId(clientId);
752            subscription.setSubscriptionName(subscriptionName);
753            subscription.setSelector(rs.getString(1));
754            subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
755                    ActiveMQDestination.QUEUE_TYPE));
756            return subscription;
757        } finally {
758            close(rs);
759            close(s);
760        }
761    }
762
763    @Override
764    public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
765            throws SQLException, IOException {
766        PreparedStatement s = null;
767        ResultSet rs = null;
768        try {
769            s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
770            s.setString(1, destination.getQualifiedName());
771            rs = s.executeQuery();
772            ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
773            while (rs.next()) {
774                SubscriptionInfo subscription = new SubscriptionInfo();
775                subscription.setDestination(destination);
776                subscription.setSelector(rs.getString(1));
777                subscription.setSubscriptionName(rs.getString(2));
778                subscription.setClientId(rs.getString(3));
779                subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
780                        ActiveMQDestination.QUEUE_TYPE));
781                rc.add(subscription);
782            }
783            return rc.toArray(new SubscriptionInfo[rc.size()]);
784        } finally {
785            close(rs);
786            close(s);
787        }
788    }
789
790    @Override
791    public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
792            IOException {
793        PreparedStatement s = null;
794        try {
795            s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
796            s.setString(1, destinationName.getQualifiedName());
797            s.executeUpdate();
798            s.close();
799            s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
800            s.setString(1, destinationName.getQualifiedName());
801            s.executeUpdate();
802        } finally {
803            close(s);
804        }
805    }
806
807    @Override
808    public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
809            String subscriptionName) throws SQLException, IOException {
810        PreparedStatement s = null;
811        try {
812            s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
813            s.setString(1, destination.getQualifiedName());
814            s.setString(2, clientId);
815            s.setString(3, subscriptionName);
816            s.executeUpdate();
817        } finally {
818            close(s);
819        }
820    }
821
822    char priorityIterator = 0; // unsigned
823    @Override
824    public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
825        PreparedStatement s = null;
826        try {
827            LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
828            s = c.getExclusiveConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
829            int priority = priorityIterator++%10;
830            s.setInt(1, priority);
831            s.setInt(2, priority);
832            int i = s.executeUpdate();
833            LOG.debug("Deleted " + i + " old message(s) at priority: " + priority);
834        } finally {
835            close(s);
836        }
837    }
838
839    @Override
840    public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
841            String clientId, String subscriberName) throws SQLException, IOException {
842        PreparedStatement s = null;
843        ResultSet rs = null;
844        long result = -1;
845        try {
846            s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
847            s.setString(1, destination.getQualifiedName());
848            s.setString(2, clientId);
849            s.setString(3, subscriberName);
850            rs = s.executeQuery();
851            if (rs.next()) {
852                result = rs.getLong(1);
853                if (result == 0 && rs.wasNull()) {
854                    result = -1;
855                }
856            }
857        } finally {
858            close(rs);
859            close(s);
860        }
861        return result;
862    }
863
864    protected static void close(PreparedStatement s) {
865        try {
866            s.close();
867        } catch (Throwable e) {
868        }
869    }
870
871    protected static void close(ResultSet rs) {
872        try {
873            rs.close();
874        } catch (Throwable e) {
875        }
876    }
877
878    @Override
879    public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
880        HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
881        PreparedStatement s = null;
882        ResultSet rs = null;
883        try {
884            s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
885            rs = s.executeQuery();
886            while (rs.next()) {
887                rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
888            }
889        } finally {
890            close(rs);
891            close(s);
892        }
893        return rc;
894    }
895
896    /**
897     * @return true if batchStatements
898     */
899    public boolean isBatchStatements() {
900        return batchStatements;
901    }
902
903    /**
904     * Set the number of statements to process as a single batch DB update
905     * @param batchStatements
906     */
907    public void setBatchStatements(boolean batchStatements) {
908        this.batchStatements = batchStatements;
909        // The next lines are deprecated and should be removed in a future release
910        // and is here in case someone created their own
911       // this.batchStatments = batchStatements;
912    }
913
914    // Note - remove batchStatment in future distributions.  Here for backward compatibility
915    /**
916     * @return true if batchStements
917     */
918    public boolean isBatchStatments() {
919        return this.batchStatements;
920    }
921
922    /**
923     * This value batchStatments is deprecated and will be removed in a future release.  Use batchStatements instead (Note the 'e' in Statement)"
924     * @deprecated
925     * @param batchStatments
926     */
927    public void setBatchStatments(boolean batchStatments) {
928        LOG.warn("batchStatments is deprecated and will be removed in a future release.  Use batchStatements instead (Note the 'e' in Statement)");
929        this.batchStatements = batchStatments;
930        this.batchStatments = batchStatments;
931    }
932
933    @Override
934    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
935        this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
936    }
937
938    /**
939     * @return the statements
940     */
941    public Statements getStatements() {
942        return this.statements;
943    }
944
945    @Override
946    public void setStatements(Statements statements) {
947        this.statements = statements;
948    }
949
950    @Override
951    public int getMaxRows() {
952        return maxRows;
953    }
954
955    /**
956     * the max value for statement maxRows, used to limit jdbc queries
957     */
958    @Override
959    public void setMaxRows(int maxRows) {
960        this.maxRows = maxRows;
961    }
962
963    @Override
964    public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
965        PreparedStatement s = null;
966        try {
967            s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
968            s.setString(1, destination.getQualifiedName());
969            s.setString(2, destination.getQualifiedName());
970            s.setString(3, destination.getQualifiedName());
971            s.setString(4, null);
972            s.setLong(5, 0);
973            s.setString(6, destination.getQualifiedName());
974            s.setLong(7, 11);  // entry out of priority range
975
976            if (s.executeUpdate() != 1) {
977                throw new IOException("Could not create ack record for destination: " + destination);
978            }
979        } finally {
980            close(s);
981        }
982    }
983
984    @Override
985    public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException {
986        PreparedStatement s = null;
987        ResultSet rs = null;
988        try {
989            s = c.getConnection().prepareStatement(this.statements.getFindOpsPendingOutcomeStatement());
990            rs = s.executeQuery();
991            while (rs.next()) {
992                long id = rs.getLong(1);
993                String encodedString = rs.getString(2);
994                byte[] encodedXid = parseBase64Binary(encodedString);
995                if (encodedXid[0] == '+') {
996                    jdbcMemoryTransactionStore.recoverAdd(id, getBinaryData(rs, 3));
997                } else {
998                    jdbcMemoryTransactionStore.recoverAck(id, encodedXid, getBinaryData(rs, 3));
999                }
1000            }
1001
1002            close(rs);
1003            close(s);
1004
1005            s = c.getConnection().prepareStatement(this.statements.getFindAcksPendingOutcomeStatement());
1006            rs = s.executeQuery();
1007            while (rs.next()) {
1008                String encodedString = rs.getString(1);
1009                byte[] encodedXid = parseBase64Binary(encodedString);
1010                String destination = rs.getString(2);
1011                String subName = rs.getString(3);
1012                String subId = rs.getString(4);
1013                jdbcMemoryTransactionStore.recoverLastAck(encodedXid,
1014                        ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE),
1015                        subName, subId);
1016            }
1017        } finally {
1018            close(rs);
1019            close(s);
1020        }
1021    }
1022
1023    @Override
1024    public void doCommitAddOp(TransactionContext c, long preparedSequence, long sequence) throws SQLException, IOException {
1025        PreparedStatement s = null;
1026        try {
1027            s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement());
1028            s.setLong(1, sequence);
1029            s.setLong(2, preparedSequence);
1030            if (s.executeUpdate() != 1) {
1031                throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence);
1032            }
1033        } finally {
1034            close(s);
1035        }
1036    }
1037
1038
1039    @Override
1040    public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
1041            IOException {
1042        PreparedStatement s = null;
1043        ResultSet rs = null;
1044        int result = 0;
1045        try {
1046            s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
1047            s.setString(1, destination.getQualifiedName());
1048            rs = s.executeQuery();
1049            if (rs.next()) {
1050                result = rs.getInt(1);
1051            }
1052        } finally {
1053            close(rs);
1054            close(s);
1055        }
1056        return result;
1057    }
1058
1059    @Override
1060    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long[] lastRecoveredEntries,
1061            long maxSeq, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
1062        PreparedStatement s = null;
1063        ResultSet rs = null;
1064        try {
1065            if (isPrioritizedMessages) {
1066                s = c.getConnection().prepareStatement(limitQuery(this.statements.getFindNextMessagesByPriorityStatement()));
1067            } else {
1068                s = c.getConnection().prepareStatement(limitQuery(this.statements.getFindNextMessagesStatement()));
1069            }
1070            s.setMaxRows(Math.min(maxReturned, maxRows));
1071            s.setString(1, destination.getQualifiedName());
1072            s.setLong(2, maxSeq);
1073            int paramId = 3;
1074            if (isPrioritizedMessages) {
1075                for (int i=9;i>=0;i--) {
1076                    s.setLong(paramId++, lastRecoveredEntries[i]);
1077                }
1078            } else {
1079                s.setLong(paramId, lastRecoveredEntries[0]);
1080            }
1081            rs = s.executeQuery();
1082            int count = 0;
1083            if (this.statements.isUseExternalMessageReferences()) {
1084                while (rs.next() && count < maxReturned) {
1085                    if (listener.recoverMessageReference(rs.getString(1))) {
1086                        count++;
1087                    } else {
1088                        LOG.debug("Stopped recover next messages");
1089                        break;
1090                    }
1091                }
1092            } else {
1093                while (rs.next() && count < maxReturned) {
1094                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
1095                        count++;
1096                    } else {
1097                        LOG.debug("Stopped recover next messages");
1098                        break;
1099                    }
1100                }
1101            }
1102        } catch (Exception e) {
1103            LOG.warn("Exception recovering next messages", e);
1104        } finally {
1105            close(rs);
1106            close(s);
1107        }
1108    }
1109
1110    @Override
1111    public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
1112            throws SQLException, IOException {
1113        PreparedStatement s = null;
1114        ResultSet rs = null;
1115        try {
1116            s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
1117            s.setString(1, id.toString());
1118            rs = s.executeQuery();
1119            long seq = -1;
1120            if (rs.next()) {
1121                seq = rs.getLong(1);
1122            }
1123            return seq;
1124        } finally {
1125            close(rs);
1126            close(s);
1127        }
1128    }
1129
1130    public static void dumpTables(Connection c, String destinationName, String clientId, String
1131      subscriptionName) throws SQLException {
1132        printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
1133        printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
1134        PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM "
1135                + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D "
1136                + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
1137                + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
1138                + " ORDER BY M.ID");
1139      s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
1140      printQuery(s,System.out); }
1141
1142    public static void dumpTables(java.sql.Connection c) throws SQLException {
1143        printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_MSGS", System.out);
1144
1145        //printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_ACKS", System.out);
1146
1147        //printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out);
1148        //printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
1149    }
1150
1151    public static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
1152            throws SQLException {
1153        printQuery(c.prepareStatement(query), out);
1154    }
1155
1156    public static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
1157            throws SQLException {
1158
1159        ResultSet set = null;
1160        try {
1161            set = s.executeQuery();
1162            java.sql.ResultSetMetaData metaData = set.getMetaData();
1163            for (int i = 1; i <= metaData.getColumnCount(); i++) {
1164                if (i == 1)
1165                    out.print("||");
1166                out.print(metaData.getColumnName(i) + "||");
1167            }
1168            out.println();
1169            while (set.next()) {
1170                for (int i = 1; i <= metaData.getColumnCount(); i++) {
1171                    if (i == 1)
1172                        out.print("|");
1173                    out.print(set.getString(i) + "|");
1174                }
1175                out.println();
1176            }
1177        } finally {
1178            try {
1179                set.close();
1180            } catch (Throwable ignore) {
1181            }
1182            try {
1183                s.close();
1184            } catch (Throwable ignore) {
1185            }
1186        }
1187    }
1188
1189    @Override
1190    public String limitQuery(String query) {
1191        return query;
1192    }
1193}