001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc.adapter;
018    
019    import java.io.IOException;
020    import java.sql.Connection;
021    import java.sql.PreparedStatement;
022    import java.sql.ResultSet;
023    import java.sql.SQLException;
024    import java.sql.Statement;
025    import java.util.ArrayList;
026    import java.util.HashSet;
027    import java.util.LinkedList;
028    import java.util.Set;
029    import java.util.concurrent.locks.ReadWriteLock;
030    import java.util.concurrent.locks.ReentrantReadWriteLock;
031    
032    import org.apache.activemq.broker.region.BaseDestination;
033    import org.apache.activemq.command.ActiveMQDestination;
034    import org.apache.activemq.command.MessageId;
035    import org.apache.activemq.command.ProducerId;
036    import org.apache.activemq.command.SubscriptionInfo;
037    import org.apache.activemq.command.XATransactionId;
038    import org.apache.activemq.store.jdbc.JDBCAdapter;
039    import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
040    import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
041    import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
042    import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
043    import org.apache.activemq.store.jdbc.Statements;
044    import org.apache.activemq.store.jdbc.TransactionContext;
045    import org.apache.activemq.util.DataByteArrayOutputStream;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
051     * encouraged to override the default implementation of methods to account for differences in JDBC Driver
052     * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
053     * The databases/JDBC drivers that use this adapter are:
054     * <ul>
055     * <li></li>
056     * </ul>
057     * 
058     * @org.apache.xbean.XBean element="defaultJDBCAdapter"
059     * 
060     * 
061     */
062    public class DefaultJDBCAdapter implements JDBCAdapter {
063        private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
064        public static final int MAX_ROWS = org.apache.activemq.ActiveMQPrefetchPolicy.MAX_PREFETCH_SIZE;
065        protected Statements statements;
066        protected boolean batchStatments = true;
067        protected boolean prioritizedMessages;
068        protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
069        protected int maxRows = MAX_ROWS;
070    
071        protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
072            s.setBytes(index, data);
073        }
074    
075        protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
076            return rs.getBytes(index);
077        }
078    
079        public void doCreateTables(TransactionContext c) throws SQLException, IOException {
080            Statement s = null;
081            cleanupExclusiveLock.writeLock().lock();
082            try {
083                // Check to see if the table already exists. If it does, then don't
084                // log warnings during startup.
085                // Need to run the scripts anyways since they may contain ALTER
086                // statements that upgrade a previous version
087                // of the table
088                boolean alreadyExists = false;
089                ResultSet rs = null;
090                try {
091                    rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
092                            new String[] { "TABLE" });
093                    alreadyExists = rs.next();
094                } catch (Throwable ignore) {
095                } finally {
096                    close(rs);
097                }
098                s = c.getConnection().createStatement();
099                String[] createStatments = this.statements.getCreateSchemaStatements();
100                for (int i = 0; i < createStatments.length; i++) {
101                    // This will fail usually since the tables will be
102                    // created already.
103                    try {
104                        LOG.debug("Executing SQL: " + createStatments[i]);
105                        s.execute(createStatments[i]);
106                    } catch (SQLException e) {
107                        if (alreadyExists) {
108                            LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
109                                    + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
110                                    + " Vendor code: " + e.getErrorCode());
111                        } else {
112                            LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
113                                    + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
114                                    + " Vendor code: " + e.getErrorCode());
115                            JDBCPersistenceAdapter.log("Failure details: ", e);
116                        }
117                    }
118                }
119                c.getConnection().commit();
120            } finally {
121                cleanupExclusiveLock.writeLock().unlock();
122                try {
123                    s.close();
124                } catch (Throwable e) {
125                }
126            }
127        }
128    
129        public void doDropTables(TransactionContext c) throws SQLException, IOException {
130            Statement s = null;
131            cleanupExclusiveLock.writeLock().lock();
132            try {
133                s = c.getConnection().createStatement();
134                String[] dropStatments = this.statements.getDropSchemaStatements();
135                for (int i = 0; i < dropStatments.length; i++) {
136                    // This will fail usually since the tables will be
137                    // created already.
138                    try {
139                        LOG.debug("Executing SQL: " + dropStatments[i]);
140                        s.execute(dropStatments[i]);
141                    } catch (SQLException e) {
142                        LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
143                                + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
144                                + e.getErrorCode());
145                        JDBCPersistenceAdapter.log("Failure details: ", e);
146                    }
147                }
148                c.getConnection().commit();
149            } finally {
150                cleanupExclusiveLock.writeLock().unlock();
151                try {
152                    s.close();
153                } catch (Throwable e) {
154                }
155            }
156        }
157    
158        public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
159            PreparedStatement s = null;
160            ResultSet rs = null;
161            cleanupExclusiveLock.readLock().lock();
162            try {
163                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
164                rs = s.executeQuery();
165                long seq1 = 0;
166                if (rs.next()) {
167                    seq1 = rs.getLong(1);
168                }
169                rs.close();
170                s.close();
171                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
172                rs = s.executeQuery();
173                long seq2 = 0;
174                if (rs.next()) {
175                    seq2 = rs.getLong(1);
176                }
177                long seq = Math.max(seq1, seq2);
178                return seq;
179            } finally {
180                cleanupExclusiveLock.readLock().unlock();
181                close(rs);
182                close(s);
183            }
184        }
185        
186        public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
187            PreparedStatement s = null;
188            ResultSet rs = null;
189            cleanupExclusiveLock.readLock().lock();
190            try {
191                s = c.getConnection().prepareStatement(
192                        this.statements.getFindMessageByIdStatement());
193                s.setLong(1, storeSequenceId);
194                rs = s.executeQuery();
195                if (!rs.next()) {
196                    return null;
197                }
198                return getBinaryData(rs, 1);
199            } finally {
200                cleanupExclusiveLock.readLock().unlock();
201                close(rs);
202                close(s);
203            }
204        }
205    
206    
207        /**
208         * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
209         */
210        public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
211                                 long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
212            PreparedStatement s = c.getAddMessageStatement();
213            cleanupExclusiveLock.readLock().lock();
214            try {
215                if (s == null) {
216                    s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
217                    if (this.batchStatments) {
218                        c.setAddMessageStatement(s);
219                    }
220                }
221                s.setLong(1, sequence);
222                s.setString(2, messageID.getProducerId().toString());
223                s.setLong(3, messageID.getProducerSequenceId());
224                s.setString(4, destination.getQualifiedName());
225                s.setLong(5, expiration);
226                s.setLong(6, priority);
227                setBinaryData(s, 7, data);
228                if (xid != null) {
229                    byte[] xidVal = xid.getEncodedXidBytes();
230                    xidVal[0] = '+';
231                    setBinaryData(s, 8, xidVal);
232                } else {
233                    setBinaryData(s, 8, null);
234                }
235                if (this.batchStatments) {
236                    s.addBatch();
237                } else if (s.executeUpdate() != 1) {
238                    throw new SQLException("Failed add a message");
239                }
240            } finally {
241                cleanupExclusiveLock.readLock().unlock();
242                if (!this.batchStatments) {
243                    if (s != null) {
244                        s.close();
245                    }
246                }
247            }
248        }
249    
250        public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
251                long expirationTime, String messageRef) throws SQLException, IOException {
252            PreparedStatement s = c.getAddMessageStatement();
253            cleanupExclusiveLock.readLock().lock();
254            try {
255                if (s == null) {
256                    s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
257                    if (this.batchStatments) {
258                        c.setAddMessageStatement(s);
259                    }
260                }
261                s.setLong(1, messageID.getBrokerSequenceId());
262                s.setString(2, messageID.getProducerId().toString());
263                s.setLong(3, messageID.getProducerSequenceId());
264                s.setString(4, destination.getQualifiedName());
265                s.setLong(5, expirationTime);
266                s.setString(6, messageRef);
267                if (this.batchStatments) {
268                    s.addBatch();
269                } else if (s.executeUpdate() != 1) {
270                    throw new SQLException("Failed add a message");
271                }
272            } finally {
273                cleanupExclusiveLock.readLock().unlock();
274                if (!this.batchStatments) {
275                    s.close();
276                }
277            }
278        }
279    
280        public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
281            PreparedStatement s = null;
282            ResultSet rs = null;
283            cleanupExclusiveLock.readLock().lock();
284            try {
285                s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
286                s.setString(1, messageID.getProducerId().toString());
287                s.setLong(2, messageID.getProducerSequenceId());
288                s.setString(3, destination.getQualifiedName());
289                rs = s.executeQuery();
290                if (!rs.next()) {
291                    return new long[]{0,0};
292                }
293                return new long[]{rs.getLong(1), rs.getLong(2)};
294            } finally {
295                cleanupExclusiveLock.readLock().unlock();
296                close(rs);
297                close(s);
298            }
299        }
300    
301        public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
302            PreparedStatement s = null;
303            ResultSet rs = null;
304            cleanupExclusiveLock.readLock().lock();
305            try {
306                s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
307                s.setString(1, id.getProducerId().toString());
308                s.setLong(2, id.getProducerSequenceId());
309                rs = s.executeQuery();
310                if (!rs.next()) {
311                    return null;
312                }
313                return getBinaryData(rs, 1);
314            } finally {
315                cleanupExclusiveLock.readLock().unlock();
316                close(rs);
317                close(s);
318            }
319        }
320    
321        public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
322            PreparedStatement s = null;
323            ResultSet rs = null;
324            cleanupExclusiveLock.readLock().lock();
325            try {
326                s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
327                s.setLong(1, seq);
328                rs = s.executeQuery();
329                if (!rs.next()) {
330                    return null;
331                }
332                return rs.getString(1);
333            } finally {
334                cleanupExclusiveLock.readLock().unlock();
335                close(rs);
336                close(s);
337            }
338        }
339    
340        /**
341         * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
342         */
343        public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException {
344            PreparedStatement s = c.getRemovedMessageStatement();
345            cleanupExclusiveLock.readLock().lock();
346            try {
347                if (s == null) {
348                    s = c.getConnection().prepareStatement(xid == null ?
349                            this.statements.getRemoveMessageStatement() : this.statements.getUpdateXidFlagStatement());
350                    if (this.batchStatments) {
351                        c.setRemovedMessageStatement(s);
352                    }
353                }
354                if (xid == null) {
355                    s.setLong(1, seq);
356                } else {
357                    byte[] xidVal = xid.getEncodedXidBytes();
358                    xidVal[0] = '-';
359                    setBinaryData(s, 1, xidVal);
360                    s.setLong(2, seq);
361                }
362                if (this.batchStatments) {
363                    s.addBatch();
364                } else if (s.executeUpdate() != 1) {
365                    throw new SQLException("Failed to remove message");
366                }
367            } finally {
368                cleanupExclusiveLock.readLock().unlock();
369                if (!this.batchStatments && s != null) {
370                    s.close();
371                }
372            }
373        }
374    
375        public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
376                throws Exception {
377            PreparedStatement s = null;
378            ResultSet rs = null;
379            cleanupExclusiveLock.readLock().lock();
380            try {
381                s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
382                s.setString(1, destination.getQualifiedName());
383                rs = s.executeQuery();
384                if (this.statements.isUseExternalMessageReferences()) {
385                    while (rs.next()) {
386                        if (!listener.recoverMessageReference(rs.getString(2))) {
387                            break;
388                        }
389                    }
390                } else {
391                    while (rs.next()) {
392                        if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
393                            break;
394                        }
395                    }
396                }
397            } finally {
398                cleanupExclusiveLock.readLock().unlock();
399                close(rs);
400                close(s);
401            }
402        }
403    
404        public void doMessageIdScan(TransactionContext c, int limit, 
405                JDBCMessageIdScanListener listener) throws SQLException, IOException {
406            PreparedStatement s = null;
407            ResultSet rs = null;
408            cleanupExclusiveLock.readLock().lock();
409            try {
410                s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
411                s.setMaxRows(limit);
412                rs = s.executeQuery();
413                // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
414                LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
415                while (rs.next()) {
416                    reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
417                }
418                if (LOG.isDebugEnabled()) {
419                    LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
420                }
421                for (MessageId id : reverseOrderIds) {
422                    listener.messageId(id);
423                }
424            } finally {
425                cleanupExclusiveLock.readLock().unlock();
426                close(rs);
427                close(s);
428            }
429        }
430        
431        public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
432                                             String subscriptionName, long seq, long priority) throws SQLException, IOException {
433            PreparedStatement s = c.getUpdateLastAckStatement();
434            cleanupExclusiveLock.readLock().lock();
435            try {
436                if (s == null) {
437                    s = c.getConnection().prepareStatement(xid == null ?
438                            this.statements.getUpdateDurableLastAckWithPriorityStatement() :
439                            this.statements.getUpdateDurableLastAckWithPriorityInTxStatement());
440                    if (this.batchStatments) {
441                        c.setUpdateLastAckStatement(s);
442                    }
443                }
444                if (xid != null) {
445                    byte[] xidVal = encodeXid(xid, seq, priority);
446                    setBinaryData(s, 1, xidVal);
447                } else {
448                    s.setLong(1, seq);
449                }
450                s.setString(2, destination.getQualifiedName());
451                s.setString(3, clientId);
452                s.setString(4, subscriptionName);
453                s.setLong(5, priority);
454                if (this.batchStatments) {
455                    s.addBatch();
456                } else if (s.executeUpdate() != 1) {
457                    throw new SQLException("Failed update last ack with priority: " + priority + ", for sub: " + subscriptionName);
458                }
459            } finally {
460                cleanupExclusiveLock.readLock().unlock();
461                if (!this.batchStatments) {
462                    close(s);
463                }
464            }
465        }
466    
467    
468        public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
469                                 String subscriptionName, long seq, long priority) throws SQLException, IOException {
470            PreparedStatement s = c.getUpdateLastAckStatement();
471            cleanupExclusiveLock.readLock().lock();
472            try {
473                if (s == null) {
474                    s = c.getConnection().prepareStatement(xid == null ?
475                            this.statements.getUpdateDurableLastAckStatement() :
476                            this.statements.getUpdateDurableLastAckInTxStatement());
477                    if (this.batchStatments) {
478                        c.setUpdateLastAckStatement(s);
479                    }
480                }
481                if (xid != null) {
482                    byte[] xidVal = encodeXid(xid, seq, priority);
483                    setBinaryData(s, 1, xidVal);
484                } else {
485                    s.setLong(1, seq);
486                }
487                s.setString(2, destination.getQualifiedName());
488                s.setString(3, clientId);
489                s.setString(4, subscriptionName);
490    
491                if (this.batchStatments) {
492                    s.addBatch();
493                } else if (s.executeUpdate() != 1) {
494                    throw new IOException("Could not update last ack seq : "
495                                + seq + ", for sub: " + subscriptionName);
496                }
497            } finally {
498                cleanupExclusiveLock.readLock().unlock();
499                if (!this.batchStatments) {
500                    close(s);
501                }            
502            }
503        }
504    
505        private byte[] encodeXid(XATransactionId xid, long seq, long priority) {
506            byte[] xidVal = xid.getEncodedXidBytes();
507            // encode the update
508            DataByteArrayOutputStream outputStream = xid.internalOutputStream();
509            outputStream.position(1);
510            outputStream.writeLong(seq);
511            outputStream.writeByte(Long.valueOf(priority).byteValue());
512            return xidVal;
513        }
514    
515        @Override
516        public void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String clientId, String subName) throws SQLException, IOException {
517            PreparedStatement s = null;
518            cleanupExclusiveLock.readLock().lock();
519            try {
520                s = c.getConnection().prepareStatement(this.statements.getClearDurableLastAckInTxStatement());
521                s.setString(1, destination.getQualifiedName());
522                s.setString(2, clientId);
523                s.setString(3, subName);
524                s.setLong(4, priority);
525                if (s.executeUpdate() != 1) {
526                    throw new IOException("Could not remove prepared transaction state from message ack for: " + clientId + ":" + subName);
527                }
528            } finally {
529                cleanupExclusiveLock.readLock().unlock();
530                close(s);
531            }
532        }
533    
534        public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
535                String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
536            // dumpTables(c,
537            // destination.getQualifiedName(),clientId,subscriptionName);
538            PreparedStatement s = null;
539            ResultSet rs = null;
540            cleanupExclusiveLock.readLock().lock();
541            try {
542                s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
543                s.setString(1, destination.getQualifiedName());
544                s.setString(2, clientId);
545                s.setString(3, subscriptionName);
546                rs = s.executeQuery();
547                if (this.statements.isUseExternalMessageReferences()) {
548                    while (rs.next()) {
549                        if (!listener.recoverMessageReference(rs.getString(2))) {
550                            break;
551                        }
552                    }
553                } else {
554                    while (rs.next()) {
555                        if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
556                            break;
557                        }
558                    }
559                }
560            } finally {
561                cleanupExclusiveLock.readLock().unlock();
562                close(rs);
563                close(s);
564            }
565        }
566    
567        public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
568                String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
569            
570            PreparedStatement s = null;
571            ResultSet rs = null;
572            cleanupExclusiveLock.readLock().lock();
573            try {
574                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
575                s.setMaxRows(Math.min(maxReturned * 2, maxRows));
576                s.setString(1, destination.getQualifiedName());
577                s.setString(2, clientId);
578                s.setString(3, subscriptionName);
579                s.setLong(4, seq);
580                rs = s.executeQuery();
581                int count = 0;
582                if (this.statements.isUseExternalMessageReferences()) {
583                    while (rs.next() && count < maxReturned) {
584                        if (listener.recoverMessageReference(rs.getString(1))) {
585                            count++;
586                        }
587                    }
588                } else {
589                    while (rs.next() && count < maxReturned) {
590                        if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
591                            count++;
592                        }
593                    }
594                }
595            } finally {
596                cleanupExclusiveLock.readLock().unlock();
597                close(rs);
598                close(s);
599            }
600        }
601    
602        public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
603                String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
604    
605            PreparedStatement s = null;
606            ResultSet rs = null;
607            cleanupExclusiveLock.readLock().lock();
608            try {
609                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
610                s.setMaxRows(Math.min(maxReturned * 2, maxRows));
611                s.setString(1, destination.getQualifiedName());
612                s.setString(2, clientId);
613                s.setString(3, subscriptionName);
614                s.setLong(4, seq);
615                s.setLong(5, priority);
616                rs = s.executeQuery();
617                int count = 0;
618                if (this.statements.isUseExternalMessageReferences()) {
619                    while (rs.next() && count < maxReturned) {
620                        if (listener.recoverMessageReference(rs.getString(1))) {
621                            count++;
622                        }
623                    }
624                } else {
625                    while (rs.next() && count < maxReturned) {
626                        if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
627                            count++;
628                        }
629                    }
630                }
631            } finally {
632                cleanupExclusiveLock.readLock().unlock();
633                close(rs);
634                close(s);
635            }
636        }
637    
638        public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
639                String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
640            PreparedStatement s = null;
641            ResultSet rs = null;
642            int result = 0;
643            cleanupExclusiveLock.readLock().lock();
644            try {
645                if (isPrioritizedMessages) {
646                    s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
647                } else {
648                    s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());    
649                }
650                s.setString(1, destination.getQualifiedName());
651                s.setString(2, clientId);
652                s.setString(3, subscriptionName);
653                rs = s.executeQuery();
654                if (rs.next()) {
655                    result = rs.getInt(1);
656                }
657            } finally {
658                cleanupExclusiveLock.readLock().unlock();
659                close(rs);
660                close(s);
661            }
662            return result;
663        }
664    
665        /**
666         * @param c 
667         * @param info 
668         * @param retroactive 
669         * @throws SQLException 
670         * @throws IOException 
671         */
672        public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
673                throws SQLException, IOException {
674            // dumpTables(c, destination.getQualifiedName(), clientId,
675            // subscriptionName);
676            PreparedStatement s = null;
677            cleanupExclusiveLock.readLock().lock();
678            try {
679                long lastMessageId = -1;
680                if (!retroactive) {
681                    s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
682                    ResultSet rs = null;
683                    try {
684                        rs = s.executeQuery();
685                        if (rs.next()) {
686                            lastMessageId = rs.getLong(1);
687                        }
688                    } finally {
689                        close(rs);
690                        close(s);
691                    }
692                }
693                s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
694                int maxPriority = 1;
695                if (isPrioritizedMessages) {
696                    maxPriority = 10;
697                }
698    
699                for (int priority = 0; priority < maxPriority; priority++) {
700                    s.setString(1, info.getDestination().getQualifiedName());
701                    s.setString(2, info.getClientId());
702                    s.setString(3, info.getSubscriptionName());
703                    s.setString(4, info.getSelector());
704                    s.setLong(5, lastMessageId);
705                    s.setString(6, info.getSubscribedDestination().getQualifiedName());
706                    s.setLong(7, priority);
707    
708                    if (s.executeUpdate() != 1) {
709                        throw new IOException("Could not create durable subscription for: " + info.getClientId());
710                    }
711                }
712    
713            } finally {
714                cleanupExclusiveLock.readLock().unlock();
715                close(s);
716            }
717        }
718    
719        public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
720                String clientId, String subscriptionName) throws SQLException, IOException {
721            PreparedStatement s = null;
722            ResultSet rs = null;
723            cleanupExclusiveLock.readLock().lock();
724            try {
725                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
726                s.setString(1, destination.getQualifiedName());
727                s.setString(2, clientId);
728                s.setString(3, subscriptionName);
729                rs = s.executeQuery();
730                if (!rs.next()) {
731                    return null;
732                }
733                SubscriptionInfo subscription = new SubscriptionInfo();
734                subscription.setDestination(destination);
735                subscription.setClientId(clientId);
736                subscription.setSubscriptionName(subscriptionName);
737                subscription.setSelector(rs.getString(1));
738                subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
739                        ActiveMQDestination.QUEUE_TYPE));
740                return subscription;
741            } finally {
742                cleanupExclusiveLock.readLock().unlock();
743                close(rs);
744                close(s);
745            }
746        }
747    
748        public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
749                throws SQLException, IOException {
750            PreparedStatement s = null;
751            ResultSet rs = null;
752            cleanupExclusiveLock.readLock().lock();
753            try {
754                s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
755                s.setString(1, destination.getQualifiedName());
756                rs = s.executeQuery();
757                ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
758                while (rs.next()) {
759                    SubscriptionInfo subscription = new SubscriptionInfo();
760                    subscription.setDestination(destination);
761                    subscription.setSelector(rs.getString(1));
762                    subscription.setSubscriptionName(rs.getString(2));
763                    subscription.setClientId(rs.getString(3));
764                    subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
765                            ActiveMQDestination.QUEUE_TYPE));
766                    rc.add(subscription);
767                }
768                return rc.toArray(new SubscriptionInfo[rc.size()]);
769            } finally {
770                cleanupExclusiveLock.readLock().unlock();
771                close(rs);
772                close(s);
773            }
774        }
775    
776        public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
777                IOException {
778            PreparedStatement s = null;
779            cleanupExclusiveLock.readLock().lock();
780            try {
781                s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
782                s.setString(1, destinationName.getQualifiedName());
783                s.executeUpdate();
784                s.close();
785                s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
786                s.setString(1, destinationName.getQualifiedName());
787                s.executeUpdate();
788            } finally {
789                cleanupExclusiveLock.readLock().unlock();
790                close(s);
791            }
792        }
793    
794        public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
795                String subscriptionName) throws SQLException, IOException {
796            PreparedStatement s = null;
797            cleanupExclusiveLock.readLock().lock();
798            try {
799                s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
800                s.setString(1, destination.getQualifiedName());
801                s.setString(2, clientId);
802                s.setString(3, subscriptionName);
803                s.executeUpdate();
804            } finally {
805                cleanupExclusiveLock.readLock().unlock();
806                close(s);
807            }
808        }
809    
810        char priorityIterator = 0; // unsigned
811        public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
812            PreparedStatement s = null;
813            cleanupExclusiveLock.writeLock().lock();
814            try {
815                LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
816                s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
817                int priority = priorityIterator++%10;
818                s.setInt(1, priority);
819                s.setInt(2, priority);
820                int i = s.executeUpdate();
821                LOG.debug("Deleted " + i + " old message(s) at priority: " + priority);
822            } finally {
823                cleanupExclusiveLock.writeLock().unlock();
824                close(s);
825            }
826        }
827    
828        public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
829                String clientId, String subscriberName) throws SQLException, IOException {
830            PreparedStatement s = null;
831            ResultSet rs = null;
832            long result = -1;
833            cleanupExclusiveLock.readLock().lock();
834            try {
835                s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
836                s.setString(1, destination.getQualifiedName());
837                s.setString(2, clientId);
838                s.setString(3, subscriberName);
839                rs = s.executeQuery();
840                if (rs.next()) {
841                    result = rs.getLong(1);
842                    if (result == 0 && rs.wasNull()) {
843                        result = -1;
844                    }
845                }
846            } finally {
847                cleanupExclusiveLock.readLock().unlock();
848                close(rs);
849                close(s);
850            }
851            return result;
852        }
853    
854        protected static void close(PreparedStatement s) {
855            try {
856                s.close();
857            } catch (Throwable e) {
858            }
859        }
860    
861        protected static void close(ResultSet rs) {
862            try {
863                rs.close();
864            } catch (Throwable e) {
865            }
866        }
867    
868        public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
869            HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
870            PreparedStatement s = null;
871            ResultSet rs = null;
872            cleanupExclusiveLock.readLock().lock();
873            try {
874                s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
875                rs = s.executeQuery();
876                while (rs.next()) {
877                    rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
878                }
879            } finally {
880                cleanupExclusiveLock.readLock().unlock();
881                close(rs);
882                close(s);
883            }
884            return rc;
885        }
886    
887        /**
888         * @return true if batchStements
889         */
890        public boolean isBatchStatments() {
891            return this.batchStatments;
892        }
893    
894        /**
895         * @param batchStatments
896         */
897        public void setBatchStatments(boolean batchStatments) {
898            this.batchStatments = batchStatments;
899        }
900    
901        public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
902            this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
903        }
904    
905        /**
906         * @return the statements
907         */
908        public Statements getStatements() {
909            return this.statements;
910        }
911    
912        public void setStatements(Statements statements) {
913            this.statements = statements;
914        }
915    
916        public int getMaxRows() {
917            return maxRows;
918        }
919    
920        /**
921         * the max value for statement maxRows, used to limit jdbc queries
922         */
923        public void setMaxRows(int maxRows) {
924            this.maxRows = maxRows;
925        }
926    
927        @Override
928        public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
929            PreparedStatement s = null;
930            cleanupExclusiveLock.readLock().lock();
931            try {
932                s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
933                s.setString(1, destination.getQualifiedName());
934                s.setString(2, destination.getQualifiedName());
935                s.setString(3, destination.getQualifiedName());
936                s.setString(4, null);
937                s.setLong(5, 0);
938                s.setString(6, destination.getQualifiedName());
939                s.setLong(7, 11);  // entry out of priority range
940    
941                if (s.executeUpdate() != 1) {
942                    throw new IOException("Could not create ack record for destination: " + destination);
943                }
944            } finally {
945                cleanupExclusiveLock.readLock().unlock();
946                close(s);
947            }
948        }
949    
950        @Override
951        public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException {
952            PreparedStatement s = null;
953            ResultSet rs = null;
954            cleanupExclusiveLock.readLock().lock();
955            try {
956                s = c.getConnection().prepareStatement(this.statements.getFindOpsPendingOutcomeStatement());
957                rs = s.executeQuery();
958                while (rs.next()) {
959                    long id = rs.getLong(1);
960                    byte[] encodedXid = getBinaryData(rs, 2);
961                    if (encodedXid[0] == '+') {
962                        jdbcMemoryTransactionStore.recoverAdd(id, getBinaryData(rs, 3));
963                    } else {
964                        jdbcMemoryTransactionStore.recoverAck(id, encodedXid, getBinaryData(rs, 3));
965                    }
966                }
967    
968                close(rs);
969                close(s);
970    
971                s = c.getConnection().prepareStatement(this.statements.getFindAcksPendingOutcomeStatement());
972                rs = s.executeQuery();
973                while (rs.next()) {
974                    byte[] encodedXid = getBinaryData(rs, 1);
975                    String destination = rs.getString(2);
976                    String subName = rs.getString(3);
977                    String subId = rs.getString(4);
978                    jdbcMemoryTransactionStore.recoverLastAck(encodedXid,
979                            ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE),
980                            subName, subId);
981                }
982            } finally {
983                close(rs);
984                cleanupExclusiveLock.readLock().unlock();
985                close(s);
986            }
987        }
988    
989        @Override
990        public void doCommitAddOp(TransactionContext c, long sequence) throws SQLException, IOException {
991            PreparedStatement s = null;
992            cleanupExclusiveLock.readLock().lock();
993            try {
994                s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement());
995                s.setLong(1, sequence);
996                if (s.executeUpdate() != 1) {
997                    throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence);
998                }
999            } finally {
1000                cleanupExclusiveLock.readLock().unlock();
1001                close(s);
1002            }
1003        }
1004    
1005    
1006        public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
1007                IOException {
1008            PreparedStatement s = null;
1009            ResultSet rs = null;
1010            int result = 0;
1011            cleanupExclusiveLock.readLock().lock();
1012            try {
1013                s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
1014                s.setString(1, destination.getQualifiedName());
1015                rs = s.executeQuery();
1016                if (rs.next()) {
1017                    result = rs.getInt(1);
1018                }
1019            } finally {
1020                cleanupExclusiveLock.readLock().unlock();
1021                close(rs);
1022                close(s);
1023            }
1024            return result;
1025        }
1026    
1027        public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
1028                long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
1029            PreparedStatement s = null;
1030            ResultSet rs = null;
1031            cleanupExclusiveLock.readLock().lock();
1032            try {
1033                if (isPrioritizedMessages) {
1034                    s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
1035                } else {
1036                    s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
1037                }
1038                s.setMaxRows(Math.max(maxReturned * 2, maxRows));
1039                s.setString(1, destination.getQualifiedName());
1040                s.setLong(2, nextSeq);
1041                if (isPrioritizedMessages) {
1042                    s.setLong(3, priority);
1043                    s.setLong(4, priority);
1044                }
1045                rs = s.executeQuery();
1046                int count = 0;
1047                if (this.statements.isUseExternalMessageReferences()) {
1048                    while (rs.next() && count < maxReturned) {
1049                        if (listener.recoverMessageReference(rs.getString(1))) {
1050                            count++;
1051                        } else {
1052                            LOG.debug("Stopped recover next messages");
1053                            break;
1054                        }
1055                    }
1056                } else {
1057                    while (rs.next() && count < maxReturned) {
1058                        if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
1059                            count++;
1060                        } else {
1061                            LOG.debug("Stopped recover next messages");
1062                            break;
1063                        }
1064                    }
1065                }
1066            } catch (Exception e) {
1067                e.printStackTrace();
1068            } finally {
1069                cleanupExclusiveLock.readLock().unlock();
1070                close(rs);
1071                close(s);
1072            }
1073        }
1074    
1075        public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
1076                throws SQLException, IOException {
1077            PreparedStatement s = null;
1078            ResultSet rs = null;
1079            cleanupExclusiveLock.readLock().lock();
1080            try {
1081                s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
1082                s.setString(1, id.toString());
1083                rs = s.executeQuery();
1084                long seq = -1;
1085                if (rs.next()) {
1086                    seq = rs.getLong(1);
1087                }
1088                return seq;
1089            } finally {
1090                cleanupExclusiveLock.readLock().unlock();
1091                close(rs);
1092                close(s);
1093            }
1094        }
1095    
1096        public static void dumpTables(Connection c, String destinationName, String clientId, String
1097          subscriptionName) throws SQLException { 
1098            printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 
1099            printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 
1100            PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " 
1101                    + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " 
1102                    + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 
1103                    + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 
1104                    + " ORDER BY M.ID");
1105          s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
1106          printQuery(s,System.out); }
1107    
1108        public static void dumpTables(java.sql.Connection c) throws SQLException {
1109            printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out);
1110            printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
1111        }
1112    
1113        public static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
1114                throws SQLException {
1115            printQuery(c.prepareStatement(query), out);
1116        }
1117    
1118        public static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
1119                throws SQLException {
1120    
1121            ResultSet set = null;
1122            try {
1123                set = s.executeQuery();
1124                java.sql.ResultSetMetaData metaData = set.getMetaData();
1125                for (int i = 1; i <= metaData.getColumnCount(); i++) {
1126                    if (i == 1)
1127                        out.print("||");
1128                    out.print(metaData.getColumnName(i) + "||");
1129                }
1130                out.println();
1131                while (set.next()) {
1132                    for (int i = 1; i <= metaData.getColumnCount(); i++) {
1133                        if (i == 1)
1134                            out.print("|");
1135                        out.print(set.getString(i) + "|");
1136                    }
1137                    out.println();
1138                }
1139            } finally {
1140                try {
1141                    set.close();
1142                } catch (Throwable ignore) {
1143                }
1144                try {
1145                    s.close();
1146                } catch (Throwable ignore) {
1147                }
1148            }
1149        }
1150    
1151    }