001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc.adapter;
018
019import java.io.IOException;
020import java.sql.Connection;
021import java.sql.PreparedStatement;
022import java.sql.ResultSet;
023import java.sql.SQLException;
024import java.sql.Statement;
025import java.util.ArrayList;
026import java.util.HashSet;
027import java.util.LinkedList;
028import java.util.Set;
029import java.util.concurrent.locks.ReadWriteLock;
030import java.util.concurrent.locks.ReentrantReadWriteLock;
031
032import org.apache.activemq.command.ActiveMQDestination;
033import org.apache.activemq.command.MessageId;
034import org.apache.activemq.command.ProducerId;
035import org.apache.activemq.command.SubscriptionInfo;
036import org.apache.activemq.command.XATransactionId;
037import org.apache.activemq.store.jdbc.JDBCAdapter;
038import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
039import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
040import org.apache.activemq.store.jdbc.JDBCMessageStore;
041import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
042import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
043import org.apache.activemq.store.jdbc.Statements;
044import org.apache.activemq.store.jdbc.TransactionContext;
045import org.apache.activemq.util.DataByteArrayOutputStream;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import static javax.xml.bind.DatatypeConverter.parseBase64Binary;
050import static javax.xml.bind.DatatypeConverter.printBase64Binary;
051
052/**
053 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
054 * encouraged to override the default implementation of methods to account for differences in JDBC Driver
055 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
056 * The databases/JDBC drivers that use this adapter are:
057 * <ul>
058 * <li></li>
059 * </ul>
060 *
061 * @org.apache.xbean.XBean element="defaultJDBCAdapter"
062 *
063 *
064 */
065public class DefaultJDBCAdapter implements JDBCAdapter {
066    private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
067    public static final int MAX_ROWS = org.apache.activemq.ActiveMQPrefetchPolicy.MAX_PREFETCH_SIZE;
068    protected Statements statements;
069    private boolean batchStatements = true;
070    //This is deprecated and should be removed in a future release
071    protected boolean batchStatments = true;
072    protected boolean prioritizedMessages;
073    protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
074    protected int maxRows = MAX_ROWS;
075
076    protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
077        s.setBytes(index, data);
078    }
079
080    protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
081        return rs.getBytes(index);
082    }
083
084    @Override
085    public void doCreateTables(TransactionContext c) throws SQLException, IOException {
086        Statement s = null;
087        cleanupExclusiveLock.writeLock().lock();
088        try {
089            // Check to see if the table already exists. If it does, then don't
090            // log warnings during startup.
091            // Need to run the scripts anyways since they may contain ALTER
092            // statements that upgrade a previous version
093            // of the table
094            boolean alreadyExists = false;
095            ResultSet rs = null;
096            try {
097                rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
098                        new String[] { "TABLE" });
099                alreadyExists = rs.next();
100            } catch (Throwable ignore) {
101            } finally {
102                close(rs);
103            }
104            s = c.getConnection().createStatement();
105            String[] createStatments = this.statements.getCreateSchemaStatements();
106            for (int i = 0; i < createStatments.length; i++) {
107                // This will fail usually since the tables will be
108                // created already.
109                try {
110                    LOG.debug("Executing SQL: " + createStatments[i]);
111                    s.execute(createStatments[i]);
112                } catch (SQLException e) {
113                    if (alreadyExists) {
114                        LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
115                                + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
116                                + " Vendor code: " + e.getErrorCode());
117                    } else {
118                        LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
119                                + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
120                                + " Vendor code: " + e.getErrorCode());
121                        JDBCPersistenceAdapter.log("Failure details: ", e);
122                    }
123                }
124            }
125
126            // if autoCommit used do not call commit
127            if(!c.getConnection().getAutoCommit()){
128                c.getConnection().commit();
129            }
130
131        } finally {
132            cleanupExclusiveLock.writeLock().unlock();
133            try {
134                s.close();
135            } catch (Throwable e) {
136            }
137        }
138    }
139
140    @Override
141    public void doDropTables(TransactionContext c) throws SQLException, IOException {
142        Statement s = null;
143        cleanupExclusiveLock.writeLock().lock();
144        try {
145            s = c.getConnection().createStatement();
146            String[] dropStatments = this.statements.getDropSchemaStatements();
147            for (int i = 0; i < dropStatments.length; i++) {
148                // This will fail usually since the tables will be
149                // created already.
150                try {
151                    LOG.debug("Executing SQL: " + dropStatments[i]);
152                    s.execute(dropStatments[i]);
153                } catch (SQLException e) {
154                    LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
155                            + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
156                            + e.getErrorCode());
157                    JDBCPersistenceAdapter.log("Failure details: ", e);
158                }
159            }
160            // if autoCommit used do not call commit
161            if(!c.getConnection().getAutoCommit()){
162               c.getConnection().commit();
163            }
164        } finally {
165            cleanupExclusiveLock.writeLock().unlock();
166            try {
167                s.close();
168            } catch (Throwable e) {
169            }
170        }
171    }
172
173    @Override
174    public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
175        PreparedStatement s = null;
176        ResultSet rs = null;
177        cleanupExclusiveLock.readLock().lock();
178        try {
179            s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
180            rs = s.executeQuery();
181            long seq1 = 0;
182            if (rs.next()) {
183                seq1 = rs.getLong(1);
184            }
185            rs.close();
186            s.close();
187            s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
188            rs = s.executeQuery();
189            long seq2 = 0;
190            if (rs.next()) {
191                seq2 = rs.getLong(1);
192            }
193            long seq = Math.max(seq1, seq2);
194            return seq;
195        } finally {
196            cleanupExclusiveLock.readLock().unlock();
197            close(rs);
198            close(s);
199        }
200    }
201
202    @Override
203    public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
204        PreparedStatement s = null;
205        ResultSet rs = null;
206        cleanupExclusiveLock.readLock().lock();
207        try {
208            s = c.getConnection().prepareStatement(
209                    this.statements.getFindMessageByIdStatement());
210            s.setLong(1, storeSequenceId);
211            rs = s.executeQuery();
212            if (!rs.next()) {
213                return null;
214            }
215            return getBinaryData(rs, 1);
216        } finally {
217            cleanupExclusiveLock.readLock().unlock();
218            close(rs);
219            close(s);
220        }
221    }
222
223
224    /**
225     * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
226     */
227    @Override
228    public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
229                             long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
230        PreparedStatement s = c.getAddMessageStatement();
231        cleanupExclusiveLock.readLock().lock();
232        try {
233            if (s == null) {
234                s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
235                if (this.batchStatements) {
236                    c.setAddMessageStatement(s);
237                }
238            }
239            s.setLong(1, sequence);
240            s.setString(2, messageID.getProducerId().toString());
241            s.setLong(3, messageID.getProducerSequenceId());
242            s.setString(4, destination.getQualifiedName());
243            s.setLong(5, expiration);
244            s.setLong(6, priority);
245            setBinaryData(s, 7, data);
246            if (xid != null) {
247                byte[] xidVal = xid.getEncodedXidBytes();
248                xidVal[0] = '+';
249                String xidString = printBase64Binary(xidVal);
250                s.setString(8, xidString);
251            } else {
252                s.setString(8, null);
253            }
254            if (this.batchStatements) {
255                s.addBatch();
256            } else if (s.executeUpdate() != 1) {
257                throw new SQLException("Failed add a message");
258            }
259        } finally {
260            cleanupExclusiveLock.readLock().unlock();
261            if (!this.batchStatements) {
262                if (s != null) {
263                    s.close();
264                }
265            }
266        }
267    }
268
269    @Override
270    public void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException {
271        PreparedStatement s = null;
272        cleanupExclusiveLock.readLock().lock();
273        try {
274            s = c.getConnection().prepareStatement(this.statements.getUpdateMessageStatement());
275            setBinaryData(s, 1, data);
276            s.setString(2, id.getProducerId().toString());
277            s.setLong(3, id.getProducerSequenceId());
278            s.setString(4, destination.getQualifiedName());
279            if (s.executeUpdate() != 1) {
280                throw new IOException("Could not update message: " + id + " in " + destination);
281            }
282        } finally {
283            cleanupExclusiveLock.readLock().unlock();
284            close(s);
285        }
286    }
287
288
289    @Override
290    public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
291            long expirationTime, String messageRef) throws SQLException, IOException {
292        PreparedStatement s = c.getAddMessageStatement();
293        cleanupExclusiveLock.readLock().lock();
294        try {
295            if (s == null) {
296                s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
297                if (this.batchStatements) {
298                    c.setAddMessageStatement(s);
299                }
300            }
301            s.setLong(1, messageID.getBrokerSequenceId());
302            s.setString(2, messageID.getProducerId().toString());
303            s.setLong(3, messageID.getProducerSequenceId());
304            s.setString(4, destination.getQualifiedName());
305            s.setLong(5, expirationTime);
306            s.setString(6, messageRef);
307            if (this.batchStatements) {
308                s.addBatch();
309            } else if (s.executeUpdate() != 1) {
310                throw new SQLException("Failed add a message");
311            }
312        } finally {
313            cleanupExclusiveLock.readLock().unlock();
314            if (!this.batchStatements) {
315                s.close();
316            }
317        }
318    }
319
320    @Override
321    public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
322        PreparedStatement s = null;
323        ResultSet rs = null;
324        cleanupExclusiveLock.readLock().lock();
325        try {
326            s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
327            s.setString(1, messageID.getProducerId().toString());
328            s.setLong(2, messageID.getProducerSequenceId());
329            s.setString(3, destination.getQualifiedName());
330            rs = s.executeQuery();
331            if (!rs.next()) {
332                return new long[]{0,0};
333            }
334            return new long[]{rs.getLong(1), rs.getLong(2)};
335        } finally {
336            cleanupExclusiveLock.readLock().unlock();
337            close(rs);
338            close(s);
339        }
340    }
341
342    @Override
343    public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
344        PreparedStatement s = null;
345        ResultSet rs = null;
346        cleanupExclusiveLock.readLock().lock();
347        try {
348            s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
349            s.setString(1, id.getProducerId().toString());
350            s.setLong(2, id.getProducerSequenceId());
351            rs = s.executeQuery();
352            if (!rs.next()) {
353                return null;
354            }
355            return getBinaryData(rs, 1);
356        } finally {
357            cleanupExclusiveLock.readLock().unlock();
358            close(rs);
359            close(s);
360        }
361    }
362
363    @Override
364    public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
365        PreparedStatement s = null;
366        ResultSet rs = null;
367        cleanupExclusiveLock.readLock().lock();
368        try {
369            s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
370            s.setLong(1, seq);
371            rs = s.executeQuery();
372            if (!rs.next()) {
373                return null;
374            }
375            return rs.getString(1);
376        } finally {
377            cleanupExclusiveLock.readLock().unlock();
378            close(rs);
379            close(s);
380        }
381    }
382
383    /**
384     * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
385     */
386    @Override
387    public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException {
388        PreparedStatement s = c.getRemovedMessageStatement();
389        cleanupExclusiveLock.readLock().lock();
390        try {
391            if (s == null) {
392                s = c.getConnection().prepareStatement(xid == null ?
393                        this.statements.getRemoveMessageStatement() : this.statements.getUpdateXidFlagStatement());
394                if (this.batchStatements) {
395                    c.setRemovedMessageStatement(s);
396                }
397            }
398            if (xid == null) {
399                s.setLong(1, seq);
400            } else {
401                byte[] xidVal = xid.getEncodedXidBytes();
402                xidVal[0] = '-';
403                String xidString = printBase64Binary(xidVal);
404                s.setString(1, xidString);
405                s.setLong(2, seq);
406            }
407            if (this.batchStatements) {
408                s.addBatch();
409            } else if (s.executeUpdate() != 1) {
410                throw new SQLException("Failed to remove message seq: " + seq);
411            }
412        } finally {
413            cleanupExclusiveLock.readLock().unlock();
414            if (!this.batchStatements && s != null) {
415                s.close();
416            }
417        }
418    }
419
420    @Override
421    public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
422            throws Exception {
423        PreparedStatement s = null;
424        ResultSet rs = null;
425        cleanupExclusiveLock.readLock().lock();
426        try {
427            s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
428            s.setString(1, destination.getQualifiedName());
429            rs = s.executeQuery();
430            if (this.statements.isUseExternalMessageReferences()) {
431                while (rs.next()) {
432                    if (!listener.recoverMessageReference(rs.getString(2))) {
433                        break;
434                    }
435                }
436            } else {
437                while (rs.next()) {
438                    if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
439                        break;
440                    }
441                }
442            }
443        } finally {
444            cleanupExclusiveLock.readLock().unlock();
445            close(rs);
446            close(s);
447        }
448    }
449
450    @Override
451    public void doMessageIdScan(TransactionContext c, int limit,
452            JDBCMessageIdScanListener listener) throws SQLException, IOException {
453        PreparedStatement s = null;
454        ResultSet rs = null;
455        cleanupExclusiveLock.readLock().lock();
456        try {
457            s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
458            s.setMaxRows(limit);
459            rs = s.executeQuery();
460            // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
461            LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
462            while (rs.next()) {
463                reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
464            }
465            if (LOG.isDebugEnabled()) {
466                LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
467            }
468            for (MessageId id : reverseOrderIds) {
469                listener.messageId(id);
470            }
471        } finally {
472            cleanupExclusiveLock.readLock().unlock();
473            close(rs);
474            close(s);
475        }
476    }
477
478    @Override
479    public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
480                                         String subscriptionName, long seq, long priority) throws SQLException, IOException {
481        PreparedStatement s = c.getUpdateLastAckStatement();
482        cleanupExclusiveLock.readLock().lock();
483        try {
484            if (s == null) {
485                s = c.getConnection().prepareStatement(xid == null ?
486                        this.statements.getUpdateDurableLastAckWithPriorityStatement() :
487                        this.statements.getUpdateDurableLastAckWithPriorityInTxStatement());
488                if (this.batchStatements) {
489                    c.setUpdateLastAckStatement(s);
490                }
491            }
492            if (xid != null) {
493                byte[] xidVal = encodeXid(xid, seq, priority);
494                String xidString = printBase64Binary(xidVal);
495                s.setString(1, xidString);
496            } else {
497                s.setLong(1, seq);
498            }
499            s.setString(2, destination.getQualifiedName());
500            s.setString(3, clientId);
501            s.setString(4, subscriptionName);
502            s.setLong(5, priority);
503            if (this.batchStatements) {
504                s.addBatch();
505            } else if (s.executeUpdate() != 1) {
506                throw new SQLException("Failed update last ack with priority: " + priority + ", for sub: " + subscriptionName);
507            }
508        } finally {
509            cleanupExclusiveLock.readLock().unlock();
510            if (!this.batchStatements) {
511                close(s);
512            }
513        }
514    }
515
516
517    @Override
518    public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
519                             String subscriptionName, long seq, long priority) throws SQLException, IOException {
520        PreparedStatement s = c.getUpdateLastAckStatement();
521        cleanupExclusiveLock.readLock().lock();
522        try {
523            if (s == null) {
524                s = c.getConnection().prepareStatement(xid == null ?
525                        this.statements.getUpdateDurableLastAckStatement() :
526                        this.statements.getUpdateDurableLastAckInTxStatement());
527                if (this.batchStatements) {
528                    c.setUpdateLastAckStatement(s);
529                }
530            }
531            if (xid != null) {
532                byte[] xidVal = encodeXid(xid, seq, priority);
533                String xidString = printBase64Binary(xidVal);
534                s.setString(1, xidString);
535            } else {
536                s.setLong(1, seq);
537            }
538            s.setString(2, destination.getQualifiedName());
539            s.setString(3, clientId);
540            s.setString(4, subscriptionName);
541
542            if (this.batchStatements) {
543                s.addBatch();
544            } else if (s.executeUpdate() != 1) {
545                throw new IOException("Could not update last ack seq : "
546                            + seq + ", for sub: " + subscriptionName);
547            }
548        } finally {
549            cleanupExclusiveLock.readLock().unlock();
550            if (!this.batchStatements) {
551                close(s);
552            }
553        }
554    }
555
556    private byte[] encodeXid(XATransactionId xid, long seq, long priority) {
557        byte[] xidVal = xid.getEncodedXidBytes();
558        // encode the update
559        DataByteArrayOutputStream outputStream = xid.internalOutputStream();
560        outputStream.position(1);
561        outputStream.writeLong(seq);
562        outputStream.writeByte(Long.valueOf(priority).byteValue());
563        return xidVal;
564    }
565
566    @Override
567    public void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String clientId, String subName) throws SQLException, IOException {
568        PreparedStatement s = null;
569        cleanupExclusiveLock.readLock().lock();
570        try {
571            s = c.getConnection().prepareStatement(this.statements.getClearDurableLastAckInTxStatement());
572            s.setString(1, destination.getQualifiedName());
573            s.setString(2, clientId);
574            s.setString(3, subName);
575            s.setLong(4, priority);
576            if (s.executeUpdate() != 1) {
577                throw new IOException("Could not remove prepared transaction state from message ack for: " + clientId + ":" + subName);
578            }
579        } finally {
580            cleanupExclusiveLock.readLock().unlock();
581            close(s);
582        }
583    }
584
585    @Override
586    public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
587            String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
588        // dumpTables(c,
589        // destination.getQualifiedName(),clientId,subscriptionName);
590        PreparedStatement s = null;
591        ResultSet rs = null;
592        cleanupExclusiveLock.readLock().lock();
593        try {
594            s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
595            s.setString(1, destination.getQualifiedName());
596            s.setString(2, clientId);
597            s.setString(3, subscriptionName);
598            rs = s.executeQuery();
599            if (this.statements.isUseExternalMessageReferences()) {
600                while (rs.next()) {
601                    if (!listener.recoverMessageReference(rs.getString(2))) {
602                        break;
603                    }
604                }
605            } else {
606                while (rs.next()) {
607                    if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
608                        break;
609                    }
610                }
611            }
612        } finally {
613            cleanupExclusiveLock.readLock().unlock();
614            close(rs);
615            close(s);
616        }
617    }
618
619    @Override
620    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
621            String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
622
623        PreparedStatement s = null;
624        ResultSet rs = null;
625        cleanupExclusiveLock.readLock().lock();
626        try {
627            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
628            s.setMaxRows(Math.min(maxReturned * 2, maxRows));
629            s.setString(1, destination.getQualifiedName());
630            s.setString(2, clientId);
631            s.setString(3, subscriptionName);
632            s.setLong(4, seq);
633            rs = s.executeQuery();
634            int count = 0;
635            if (this.statements.isUseExternalMessageReferences()) {
636                while (rs.next() && count < maxReturned) {
637                    if (listener.recoverMessageReference(rs.getString(1))) {
638                        count++;
639                    }
640                }
641            } else {
642                while (rs.next() && count < maxReturned) {
643                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
644                        count++;
645                    }
646                }
647            }
648        } finally {
649            cleanupExclusiveLock.readLock().unlock();
650            close(rs);
651            close(s);
652        }
653    }
654
655    @Override
656    public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
657            String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
658
659        PreparedStatement s = null;
660        ResultSet rs = null;
661        cleanupExclusiveLock.readLock().lock();
662        try {
663            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
664            s.setMaxRows(Math.min(maxReturned * 2, maxRows));
665            s.setString(1, destination.getQualifiedName());
666            s.setString(2, clientId);
667            s.setString(3, subscriptionName);
668            s.setLong(4, seq);
669            s.setLong(5, priority);
670            rs = s.executeQuery();
671            int count = 0;
672            if (this.statements.isUseExternalMessageReferences()) {
673                while (rs.next() && count < maxReturned) {
674                    if (listener.recoverMessageReference(rs.getString(1))) {
675                        count++;
676                    }
677                }
678            } else {
679                while (rs.next() && count < maxReturned) {
680                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
681                        count++;
682                    }
683                }
684            }
685        } finally {
686            cleanupExclusiveLock.readLock().unlock();
687            close(rs);
688            close(s);
689        }
690    }
691
692    @Override
693    public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
694            String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
695        PreparedStatement s = null;
696        ResultSet rs = null;
697        int result = 0;
698        cleanupExclusiveLock.readLock().lock();
699        try {
700            if (isPrioritizedMessages) {
701                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
702            } else {
703                s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());
704            }
705            s.setString(1, destination.getQualifiedName());
706            s.setString(2, clientId);
707            s.setString(3, subscriptionName);
708            rs = s.executeQuery();
709            if (rs.next()) {
710                result = rs.getInt(1);
711            }
712        } finally {
713            cleanupExclusiveLock.readLock().unlock();
714            close(rs);
715            close(s);
716        }
717        return result;
718    }
719
720    /**
721     * @param c
722     * @param info
723     * @param retroactive
724     * @throws SQLException
725     * @throws IOException
726     */
727    @Override
728    public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
729            throws SQLException, IOException {
730        // dumpTables(c, destination.getQualifiedName(), clientId,
731        // subscriptionName);
732        PreparedStatement s = null;
733        cleanupExclusiveLock.readLock().lock();
734        try {
735            long lastMessageId = -1;
736            if (!retroactive) {
737                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
738                ResultSet rs = null;
739                try {
740                    rs = s.executeQuery();
741                    if (rs.next()) {
742                        lastMessageId = rs.getLong(1);
743                    }
744                } finally {
745                    close(rs);
746                    close(s);
747                }
748            }
749            s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
750            int maxPriority = 1;
751            if (isPrioritizedMessages) {
752                maxPriority = 10;
753            }
754
755            for (int priority = 0; priority < maxPriority; priority++) {
756                s.setString(1, info.getDestination().getQualifiedName());
757                s.setString(2, info.getClientId());
758                s.setString(3, info.getSubscriptionName());
759                s.setString(4, info.getSelector());
760                s.setLong(5, lastMessageId);
761                s.setString(6, info.getSubscribedDestination().getQualifiedName());
762                s.setLong(7, priority);
763
764                if (s.executeUpdate() != 1) {
765                    throw new IOException("Could not create durable subscription for: " + info.getClientId());
766                }
767            }
768
769        } finally {
770            cleanupExclusiveLock.readLock().unlock();
771            close(s);
772        }
773    }
774
775    @Override
776    public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
777            String clientId, String subscriptionName) throws SQLException, IOException {
778        PreparedStatement s = null;
779        ResultSet rs = null;
780        cleanupExclusiveLock.readLock().lock();
781        try {
782            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
783            s.setString(1, destination.getQualifiedName());
784            s.setString(2, clientId);
785            s.setString(3, subscriptionName);
786            rs = s.executeQuery();
787            if (!rs.next()) {
788                return null;
789            }
790            SubscriptionInfo subscription = new SubscriptionInfo();
791            subscription.setDestination(destination);
792            subscription.setClientId(clientId);
793            subscription.setSubscriptionName(subscriptionName);
794            subscription.setSelector(rs.getString(1));
795            subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
796                    ActiveMQDestination.QUEUE_TYPE));
797            return subscription;
798        } finally {
799            cleanupExclusiveLock.readLock().unlock();
800            close(rs);
801            close(s);
802        }
803    }
804
805    @Override
806    public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
807            throws SQLException, IOException {
808        PreparedStatement s = null;
809        ResultSet rs = null;
810        cleanupExclusiveLock.readLock().lock();
811        try {
812            s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
813            s.setString(1, destination.getQualifiedName());
814            rs = s.executeQuery();
815            ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
816            while (rs.next()) {
817                SubscriptionInfo subscription = new SubscriptionInfo();
818                subscription.setDestination(destination);
819                subscription.setSelector(rs.getString(1));
820                subscription.setSubscriptionName(rs.getString(2));
821                subscription.setClientId(rs.getString(3));
822                subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
823                        ActiveMQDestination.QUEUE_TYPE));
824                rc.add(subscription);
825            }
826            return rc.toArray(new SubscriptionInfo[rc.size()]);
827        } finally {
828            cleanupExclusiveLock.readLock().unlock();
829            close(rs);
830            close(s);
831        }
832    }
833
834    @Override
835    public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
836            IOException {
837        PreparedStatement s = null;
838        cleanupExclusiveLock.readLock().lock();
839        try {
840            s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
841            s.setString(1, destinationName.getQualifiedName());
842            s.executeUpdate();
843            s.close();
844            s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
845            s.setString(1, destinationName.getQualifiedName());
846            s.executeUpdate();
847        } finally {
848            cleanupExclusiveLock.readLock().unlock();
849            close(s);
850        }
851    }
852
853    @Override
854    public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
855            String subscriptionName) throws SQLException, IOException {
856        PreparedStatement s = null;
857        cleanupExclusiveLock.readLock().lock();
858        try {
859            s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
860            s.setString(1, destination.getQualifiedName());
861            s.setString(2, clientId);
862            s.setString(3, subscriptionName);
863            s.executeUpdate();
864        } finally {
865            cleanupExclusiveLock.readLock().unlock();
866            close(s);
867        }
868    }
869
870    char priorityIterator = 0; // unsigned
871    @Override
872    public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
873        PreparedStatement s = null;
874        cleanupExclusiveLock.writeLock().lock();
875        try {
876            LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
877            s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
878            int priority = priorityIterator++%10;
879            s.setInt(1, priority);
880            s.setInt(2, priority);
881            int i = s.executeUpdate();
882            LOG.debug("Deleted " + i + " old message(s) at priority: " + priority);
883        } finally {
884            cleanupExclusiveLock.writeLock().unlock();
885            close(s);
886        }
887    }
888
889    @Override
890    public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
891            String clientId, String subscriberName) throws SQLException, IOException {
892        PreparedStatement s = null;
893        ResultSet rs = null;
894        long result = -1;
895        cleanupExclusiveLock.readLock().lock();
896        try {
897            s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
898            s.setString(1, destination.getQualifiedName());
899            s.setString(2, clientId);
900            s.setString(3, subscriberName);
901            rs = s.executeQuery();
902            if (rs.next()) {
903                result = rs.getLong(1);
904                if (result == 0 && rs.wasNull()) {
905                    result = -1;
906                }
907            }
908        } finally {
909            cleanupExclusiveLock.readLock().unlock();
910            close(rs);
911            close(s);
912        }
913        return result;
914    }
915
916    protected static void close(PreparedStatement s) {
917        try {
918            s.close();
919        } catch (Throwable e) {
920        }
921    }
922
923    protected static void close(ResultSet rs) {
924        try {
925            rs.close();
926        } catch (Throwable e) {
927        }
928    }
929
930    @Override
931    public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
932        HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
933        PreparedStatement s = null;
934        ResultSet rs = null;
935        cleanupExclusiveLock.readLock().lock();
936        try {
937            s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
938            rs = s.executeQuery();
939            while (rs.next()) {
940                rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
941            }
942        } finally {
943            cleanupExclusiveLock.readLock().unlock();
944            close(rs);
945            close(s);
946        }
947        return rc;
948    }
949
950    /**
951     * @return true if batchStatements
952     */
953    public boolean isBatchStatements() {
954        return batchStatements;
955    }
956
957    /**
958     * Set the number of statements to process as a single batch DB update
959     * @param batchStatements
960     */
961    public void setBatchStatements(boolean batchStatements) {
962        this.batchStatements = batchStatements;
963        // The next lines are deprecated and should be removed in a future release
964        // and is here in case someone created their own
965       // this.batchStatments = batchStatements;
966    }
967
968    // Note - remove batchStatment in future distributions.  Here for backward compatibility
969    /**
970     * @return true if batchStements
971     */
972    public boolean isBatchStatments() {
973        return this.batchStatements;
974    }
975
976    /**
977     * This value batchStatments is deprecated and will be removed in a future release.  Use batchStatements instead (Note the 'e' in Statement)"
978     * @deprecated
979     * @param batchStatments
980     */
981    public void setBatchStatments(boolean batchStatments) {
982        LOG.warn("batchStatments is deprecated and will be removed in a future release.  Use batchStatements instead (Note the 'e' in Statement)");
983        this.batchStatements = batchStatments;
984        this.batchStatments = batchStatments;
985    }
986
987    @Override
988    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
989        this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
990    }
991
992    /**
993     * @return the statements
994     */
995    public Statements getStatements() {
996        return this.statements;
997    }
998
999    @Override
1000    public void setStatements(Statements statements) {
1001        this.statements = statements;
1002    }
1003
1004    @Override
1005    public int getMaxRows() {
1006        return maxRows;
1007    }
1008
1009    /**
1010     * the max value for statement maxRows, used to limit jdbc queries
1011     */
1012    @Override
1013    public void setMaxRows(int maxRows) {
1014        this.maxRows = maxRows;
1015    }
1016
1017    @Override
1018    public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
1019        PreparedStatement s = null;
1020        cleanupExclusiveLock.readLock().lock();
1021        try {
1022            s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
1023            s.setString(1, destination.getQualifiedName());
1024            s.setString(2, destination.getQualifiedName());
1025            s.setString(3, destination.getQualifiedName());
1026            s.setString(4, null);
1027            s.setLong(5, 0);
1028            s.setString(6, destination.getQualifiedName());
1029            s.setLong(7, 11);  // entry out of priority range
1030
1031            if (s.executeUpdate() != 1) {
1032                throw new IOException("Could not create ack record for destination: " + destination);
1033            }
1034        } finally {
1035            cleanupExclusiveLock.readLock().unlock();
1036            close(s);
1037        }
1038    }
1039
1040    @Override
1041    public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException {
1042        PreparedStatement s = null;
1043        ResultSet rs = null;
1044        cleanupExclusiveLock.readLock().lock();
1045        try {
1046            s = c.getConnection().prepareStatement(this.statements.getFindOpsPendingOutcomeStatement());
1047            rs = s.executeQuery();
1048            while (rs.next()) {
1049                long id = rs.getLong(1);
1050                String encodedString = rs.getString(2);
1051                byte[] encodedXid = parseBase64Binary(encodedString);
1052                if (encodedXid[0] == '+') {
1053                    jdbcMemoryTransactionStore.recoverAdd(id, getBinaryData(rs, 3));
1054                } else {
1055                    jdbcMemoryTransactionStore.recoverAck(id, encodedXid, getBinaryData(rs, 3));
1056                }
1057            }
1058
1059            close(rs);
1060            close(s);
1061
1062            s = c.getConnection().prepareStatement(this.statements.getFindAcksPendingOutcomeStatement());
1063            rs = s.executeQuery();
1064            while (rs.next()) {
1065                String encodedString = rs.getString(1);
1066                byte[] encodedXid = parseBase64Binary(encodedString);
1067                String destination = rs.getString(2);
1068                String subName = rs.getString(3);
1069                String subId = rs.getString(4);
1070                jdbcMemoryTransactionStore.recoverLastAck(encodedXid,
1071                        ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE),
1072                        subName, subId);
1073            }
1074        } finally {
1075            close(rs);
1076            cleanupExclusiveLock.readLock().unlock();
1077            close(s);
1078        }
1079    }
1080
1081    @Override
1082    public void doCommitAddOp(TransactionContext c, long preparedSequence, long sequence) throws SQLException, IOException {
1083        PreparedStatement s = null;
1084        cleanupExclusiveLock.readLock().lock();
1085        try {
1086            s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement());
1087            s.setLong(1, sequence);
1088            s.setLong(2, preparedSequence);
1089            if (s.executeUpdate() != 1) {
1090                throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence);
1091            }
1092        } finally {
1093            cleanupExclusiveLock.readLock().unlock();
1094            close(s);
1095        }
1096    }
1097
1098
1099    @Override
1100    public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
1101            IOException {
1102        PreparedStatement s = null;
1103        ResultSet rs = null;
1104        int result = 0;
1105        cleanupExclusiveLock.readLock().lock();
1106        try {
1107            s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
1108            s.setString(1, destination.getQualifiedName());
1109            rs = s.executeQuery();
1110            if (rs.next()) {
1111                result = rs.getInt(1);
1112            }
1113        } finally {
1114            cleanupExclusiveLock.readLock().unlock();
1115            close(rs);
1116            close(s);
1117        }
1118        return result;
1119    }
1120
1121    @Override
1122    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long[] lastRecoveredEntries,
1123            long maxSeq, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
1124        PreparedStatement s = null;
1125        ResultSet rs = null;
1126        cleanupExclusiveLock.readLock().lock();
1127        try {
1128            if (isPrioritizedMessages) {
1129                s = c.getConnection().prepareStatement(limitQuery(this.statements.getFindNextMessagesByPriorityStatement()));
1130            } else {
1131                s = c.getConnection().prepareStatement(limitQuery(this.statements.getFindNextMessagesStatement()));
1132            }
1133            s.setMaxRows(Math.min(maxReturned, maxRows));
1134            s.setString(1, destination.getQualifiedName());
1135            s.setLong(2, maxSeq);
1136            int paramId = 3;
1137            if (isPrioritizedMessages) {
1138                for (int i=9;i>=0;i--) {
1139                    s.setLong(paramId++, lastRecoveredEntries[i]);
1140                }
1141            } else {
1142                s.setLong(paramId, lastRecoveredEntries[0]);
1143            }
1144            rs = s.executeQuery();
1145            int count = 0;
1146            if (this.statements.isUseExternalMessageReferences()) {
1147                while (rs.next() && count < maxReturned) {
1148                    if (listener.recoverMessageReference(rs.getString(1))) {
1149                        count++;
1150                    } else {
1151                        LOG.debug("Stopped recover next messages");
1152                        break;
1153                    }
1154                }
1155            } else {
1156                while (rs.next() && count < maxReturned) {
1157                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
1158                        count++;
1159                    } else {
1160                        LOG.debug("Stopped recover next messages");
1161                        break;
1162                    }
1163                }
1164            }
1165        } catch (Exception e) {
1166            LOG.warn("Exception recovering next messages", e);
1167        } finally {
1168            cleanupExclusiveLock.readLock().unlock();
1169            close(rs);
1170            close(s);
1171        }
1172    }
1173
1174    @Override
1175    public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
1176            throws SQLException, IOException {
1177        PreparedStatement s = null;
1178        ResultSet rs = null;
1179        cleanupExclusiveLock.readLock().lock();
1180        try {
1181            s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
1182            s.setString(1, id.toString());
1183            rs = s.executeQuery();
1184            long seq = -1;
1185            if (rs.next()) {
1186                seq = rs.getLong(1);
1187            }
1188            return seq;
1189        } finally {
1190            cleanupExclusiveLock.readLock().unlock();
1191            close(rs);
1192            close(s);
1193        }
1194    }
1195
1196    public static void dumpTables(Connection c, String destinationName, String clientId, String
1197      subscriptionName) throws SQLException {
1198        printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
1199        printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
1200        PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM "
1201                + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D "
1202                + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
1203                + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
1204                + " ORDER BY M.ID");
1205      s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
1206      printQuery(s,System.out); }
1207
1208    public static void dumpTables(java.sql.Connection c) throws SQLException {
1209        printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_MSGS", System.out);
1210
1211        //printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_ACKS", System.out);
1212
1213        //printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out);
1214        //printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
1215    }
1216
1217    public static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
1218            throws SQLException {
1219        printQuery(c.prepareStatement(query), out);
1220    }
1221
1222    public static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
1223            throws SQLException {
1224
1225        ResultSet set = null;
1226        try {
1227            set = s.executeQuery();
1228            java.sql.ResultSetMetaData metaData = set.getMetaData();
1229            for (int i = 1; i <= metaData.getColumnCount(); i++) {
1230                if (i == 1)
1231                    out.print("||");
1232                out.print(metaData.getColumnName(i) + "||");
1233            }
1234            out.println();
1235            while (set.next()) {
1236                for (int i = 1; i <= metaData.getColumnCount(); i++) {
1237                    if (i == 1)
1238                        out.print("|");
1239                    out.print(set.getString(i) + "|");
1240                }
1241                out.println();
1242            }
1243        } finally {
1244            try {
1245                set.close();
1246            } catch (Throwable ignore) {
1247            }
1248            try {
1249                s.close();
1250            } catch (Throwable ignore) {
1251            }
1252        }
1253    }
1254
1255    @Override
1256    public String limitQuery(String query) {
1257        return query;
1258    }
1259}