001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc.adapter;
018    
019    import java.io.IOException;
020    import java.sql.PreparedStatement;
021    import java.sql.ResultSet;
022    import java.sql.SQLException;
023    import java.sql.Statement;
024    import java.util.ArrayList;
025    import java.util.HashSet;
026    import java.util.LinkedList;
027    import java.util.Set;
028    import java.util.concurrent.locks.ReadWriteLock;
029    import java.util.concurrent.locks.ReentrantReadWriteLock;
030    
031    import org.apache.activemq.broker.region.BaseDestination;
032    import org.apache.activemq.command.ActiveMQDestination;
033    import org.apache.activemq.command.MessageId;
034    import org.apache.activemq.command.ProducerId;
035    import org.apache.activemq.command.SubscriptionInfo;
036    import org.apache.activemq.command.XATransactionId;
037    import org.apache.activemq.store.jdbc.JDBCAdapter;
038    import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
039    import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
040    import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
041    import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
042    import org.apache.activemq.store.jdbc.Statements;
043    import org.apache.activemq.store.jdbc.TransactionContext;
044    import org.apache.activemq.util.DataByteArrayOutputStream;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    /**
049     * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is
050     * encouraged to override the default implementation of methods to account for differences in JDBC Driver
051     * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/>
052     * The databases/JDBC drivers that use this adapter are:
053     * <ul>
054     * <li></li>
055     * </ul>
056     * 
057     * @org.apache.xbean.XBean element="defaultJDBCAdapter"
058     * 
059     * 
060     */
061    public class DefaultJDBCAdapter implements JDBCAdapter {
062        private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class);
063        public static final int MAX_ROWS = BaseDestination.MAX_PAGE_SIZE;
064        protected Statements statements;
065        protected boolean batchStatments = true;
066        protected boolean prioritizedMessages;
067        protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
068        // needs to be min twice the prefetch for a durable sub and large enough for selector range
069        protected int maxRows = MAX_ROWS;
070    
071        protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
072            s.setBytes(index, data);
073        }
074    
075        protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
076            return rs.getBytes(index);
077        }
078    
079        public void doCreateTables(TransactionContext c) throws SQLException, IOException {
080            Statement s = null;
081            cleanupExclusiveLock.writeLock().lock();
082            try {
083                // Check to see if the table already exists. If it does, then don't
084                // log warnings during startup.
085                // Need to run the scripts anyways since they may contain ALTER
086                // statements that upgrade a previous version
087                // of the table
088                boolean alreadyExists = false;
089                ResultSet rs = null;
090                try {
091                    rs = c.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(),
092                            new String[] { "TABLE" });
093                    alreadyExists = rs.next();
094                } catch (Throwable ignore) {
095                } finally {
096                    close(rs);
097                }
098                s = c.getConnection().createStatement();
099                String[] createStatments = this.statements.getCreateSchemaStatements();
100                for (int i = 0; i < createStatments.length; i++) {
101                    // This will fail usually since the tables will be
102                    // created already.
103                    try {
104                        LOG.debug("Executing SQL: " + createStatments[i]);
105                        s.execute(createStatments[i]);
106                    } catch (SQLException e) {
107                        if (alreadyExists) {
108                            LOG.debug("Could not create JDBC tables; The message table already existed." + " Failure was: "
109                                    + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
110                                    + " Vendor code: " + e.getErrorCode());
111                        } else {
112                            LOG.warn("Could not create JDBC tables; they could already exist." + " Failure was: "
113                                    + createStatments[i] + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState()
114                                    + " Vendor code: " + e.getErrorCode());
115                            JDBCPersistenceAdapter.log("Failure details: ", e);
116                        }
117                    }
118                }
119                c.getConnection().commit();
120            } finally {
121                cleanupExclusiveLock.writeLock().unlock();
122                try {
123                    s.close();
124                } catch (Throwable e) {
125                }
126            }
127        }
128    
129        public void doDropTables(TransactionContext c) throws SQLException, IOException {
130            Statement s = null;
131            cleanupExclusiveLock.writeLock().lock();
132            try {
133                s = c.getConnection().createStatement();
134                String[] dropStatments = this.statements.getDropSchemaStatements();
135                for (int i = 0; i < dropStatments.length; i++) {
136                    // This will fail usually since the tables will be
137                    // created already.
138                    try {
139                        LOG.debug("Executing SQL: " + dropStatments[i]);
140                        s.execute(dropStatments[i]);
141                    } catch (SQLException e) {
142                        LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i]
143                                + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: "
144                                + e.getErrorCode());
145                        JDBCPersistenceAdapter.log("Failure details: ", e);
146                    }
147                }
148                c.getConnection().commit();
149            } finally {
150                cleanupExclusiveLock.writeLock().unlock();
151                try {
152                    s.close();
153                } catch (Throwable e) {
154                }
155            }
156        }
157    
158        public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException {
159            PreparedStatement s = null;
160            ResultSet rs = null;
161            cleanupExclusiveLock.readLock().lock();
162            try {
163                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
164                rs = s.executeQuery();
165                long seq1 = 0;
166                if (rs.next()) {
167                    seq1 = rs.getLong(1);
168                }
169                rs.close();
170                s.close();
171                s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement());
172                rs = s.executeQuery();
173                long seq2 = 0;
174                if (rs.next()) {
175                    seq2 = rs.getLong(1);
176                }
177                long seq = Math.max(seq1, seq2);
178                return seq;
179            } finally {
180                cleanupExclusiveLock.readLock().unlock();
181                close(rs);
182                close(s);
183            }
184        }
185        
186        public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException {
187            PreparedStatement s = null;
188            ResultSet rs = null;
189            cleanupExclusiveLock.readLock().lock();
190            try {
191                s = c.getConnection().prepareStatement(
192                        this.statements.getFindMessageByIdStatement());
193                s.setLong(1, storeSequenceId);
194                rs = s.executeQuery();
195                if (!rs.next()) {
196                    return null;
197                }
198                return getBinaryData(rs, 1);
199            } finally {
200                cleanupExclusiveLock.readLock().unlock();
201                close(rs);
202                close(s);
203            }
204        }
205    
206    
207        /**
208         * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
209         */
210        public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
211                                 long expiration, byte priority, XATransactionId xid) throws SQLException, IOException {
212            PreparedStatement s = c.getAddMessageStatement();
213            cleanupExclusiveLock.readLock().lock();
214            try {
215                if (s == null) {
216                    s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
217                    if (this.batchStatments) {
218                        c.setAddMessageStatement(s);
219                    }
220                }
221                s.setLong(1, sequence);
222                s.setString(2, messageID.getProducerId().toString());
223                s.setLong(3, messageID.getProducerSequenceId());
224                s.setString(4, destination.getQualifiedName());
225                s.setLong(5, expiration);
226                s.setLong(6, priority);
227                setBinaryData(s, 7, data);
228                if (xid != null) {
229                    byte[] xidVal = xid.getEncodedXidBytes();
230                    xidVal[0] = '+';
231                    setBinaryData(s, 8, xidVal);
232                } else {
233                    setBinaryData(s, 8, null);
234                }
235                if (this.batchStatments) {
236                    s.addBatch();
237                } else if (s.executeUpdate() != 1) {
238                    throw new SQLException("Failed add a message");
239                }
240            } finally {
241                cleanupExclusiveLock.readLock().unlock();
242                if (!this.batchStatments) {
243                    if (s != null) {
244                        s.close();
245                    }
246                }
247            }
248        }
249    
250        public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination,
251                long expirationTime, String messageRef) throws SQLException, IOException {
252            PreparedStatement s = c.getAddMessageStatement();
253            cleanupExclusiveLock.readLock().lock();
254            try {
255                if (s == null) {
256                    s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement());
257                    if (this.batchStatments) {
258                        c.setAddMessageStatement(s);
259                    }
260                }
261                s.setLong(1, messageID.getBrokerSequenceId());
262                s.setString(2, messageID.getProducerId().toString());
263                s.setLong(3, messageID.getProducerSequenceId());
264                s.setString(4, destination.getQualifiedName());
265                s.setLong(5, expirationTime);
266                s.setString(6, messageRef);
267                if (this.batchStatments) {
268                    s.addBatch();
269                } else if (s.executeUpdate() != 1) {
270                    throw new SQLException("Failed add a message");
271                }
272            } finally {
273                cleanupExclusiveLock.readLock().unlock();
274                if (!this.batchStatments) {
275                    s.close();
276                }
277            }
278        }
279    
280        public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException {
281            PreparedStatement s = null;
282            ResultSet rs = null;
283            cleanupExclusiveLock.readLock().lock();
284            try {
285                s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement());
286                s.setString(1, messageID.getProducerId().toString());
287                s.setLong(2, messageID.getProducerSequenceId());
288                s.setString(3, destination.getQualifiedName());
289                rs = s.executeQuery();
290                if (!rs.next()) {
291                    return new long[]{0,0};
292                }
293                return new long[]{rs.getLong(1), rs.getLong(2)};
294            } finally {
295                cleanupExclusiveLock.readLock().unlock();
296                close(rs);
297                close(s);
298            }
299        }
300    
301        public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
302            PreparedStatement s = null;
303            ResultSet rs = null;
304            cleanupExclusiveLock.readLock().lock();
305            try {
306                s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
307                s.setString(1, id.getProducerId().toString());
308                s.setLong(2, id.getProducerSequenceId());
309                rs = s.executeQuery();
310                if (!rs.next()) {
311                    return null;
312                }
313                return getBinaryData(rs, 1);
314            } finally {
315                cleanupExclusiveLock.readLock().unlock();
316                close(rs);
317                close(s);
318            }
319        }
320    
321        public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException {
322            PreparedStatement s = null;
323            ResultSet rs = null;
324            cleanupExclusiveLock.readLock().lock();
325            try {
326                s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement());
327                s.setLong(1, seq);
328                rs = s.executeQuery();
329                if (!rs.next()) {
330                    return null;
331                }
332                return rs.getString(1);
333            } finally {
334                cleanupExclusiveLock.readLock().unlock();
335                close(rs);
336                close(s);
337            }
338        }
339    
340        /**
341         * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome
342         */
343        public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException {
344            PreparedStatement s = c.getRemovedMessageStatement();
345            cleanupExclusiveLock.readLock().lock();
346            try {
347                if (s == null) {
348                    s = c.getConnection().prepareStatement(xid == null ?
349                            this.statements.getRemoveMessageStatement() : this.statements.getUpdateXidFlagStatement());
350                    if (this.batchStatments) {
351                        c.setRemovedMessageStatement(s);
352                    }
353                }
354                if (xid == null) {
355                    s.setLong(1, seq);
356                } else {
357                    byte[] xidVal = xid.getEncodedXidBytes();
358                    xidVal[0] = '-';
359                    setBinaryData(s, 1, xidVal);
360                    s.setLong(2, seq);
361                }
362                if (this.batchStatments) {
363                    s.addBatch();
364                } else if (s.executeUpdate() != 1) {
365                    throw new SQLException("Failed to remove message");
366                }
367            } finally {
368                cleanupExclusiveLock.readLock().unlock();
369                if (!this.batchStatments && s != null) {
370                    s.close();
371                }
372            }
373        }
374    
375        public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener)
376                throws Exception {
377            PreparedStatement s = null;
378            ResultSet rs = null;
379            cleanupExclusiveLock.readLock().lock();
380            try {
381                s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement());
382                s.setString(1, destination.getQualifiedName());
383                rs = s.executeQuery();
384                if (this.statements.isUseExternalMessageReferences()) {
385                    while (rs.next()) {
386                        if (!listener.recoverMessageReference(rs.getString(2))) {
387                            break;
388                        }
389                    }
390                } else {
391                    while (rs.next()) {
392                        if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
393                            break;
394                        }
395                    }
396                }
397            } finally {
398                cleanupExclusiveLock.readLock().unlock();
399                close(rs);
400                close(s);
401            }
402        }
403    
404        public void doMessageIdScan(TransactionContext c, int limit, 
405                JDBCMessageIdScanListener listener) throws SQLException, IOException {
406            PreparedStatement s = null;
407            ResultSet rs = null;
408            cleanupExclusiveLock.readLock().lock();
409            try {
410                s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement());
411                s.setMaxRows(limit);
412                rs = s.executeQuery();
413                // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid
414                LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>();
415                while (rs.next()) {
416                    reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3)));
417                }
418                if (LOG.isDebugEnabled()) {
419                    LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids");
420                }
421                for (MessageId id : reverseOrderIds) {
422                    listener.messageId(id);
423                }
424            } finally {
425                cleanupExclusiveLock.readLock().unlock();
426                close(rs);
427                close(s);
428            }
429        }
430        
431        public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
432                                             String subscriptionName, long seq, long priority) throws SQLException, IOException {
433            PreparedStatement s = c.getUpdateLastAckStatement();
434            cleanupExclusiveLock.readLock().lock();
435            try {
436                if (s == null) {
437                    s = c.getConnection().prepareStatement(xid == null ?
438                            this.statements.getUpdateDurableLastAckWithPriorityStatement() :
439                            this.statements.getUpdateDurableLastAckWithPriorityInTxStatement());
440                    if (this.batchStatments) {
441                        c.setUpdateLastAckStatement(s);
442                    }
443                }
444                if (xid != null) {
445                    byte[] xidVal = encodeXid(xid, seq, priority);
446                    setBinaryData(s, 1, xidVal);
447                } else {
448                    s.setLong(1, seq);
449                }
450                s.setString(2, destination.getQualifiedName());
451                s.setString(3, clientId);
452                s.setString(4, subscriptionName);
453                s.setLong(5, priority);
454                if (this.batchStatments) {
455                    s.addBatch();
456                } else if (s.executeUpdate() != 1) {
457                    throw new SQLException("Failed update last ack with priority: " + priority + ", for sub: " + subscriptionName);
458                }
459            } finally {
460                cleanupExclusiveLock.readLock().unlock();
461                if (!this.batchStatments) {
462                    close(s);
463                }
464            }
465        }
466    
467    
468        public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId,
469                                 String subscriptionName, long seq, long priority) throws SQLException, IOException {
470            PreparedStatement s = c.getUpdateLastAckStatement();
471            cleanupExclusiveLock.readLock().lock();
472            try {
473                if (s == null) {
474                    s = c.getConnection().prepareStatement(xid == null ?
475                            this.statements.getUpdateDurableLastAckStatement() :
476                            this.statements.getUpdateDurableLastAckInTxStatement());
477                    if (this.batchStatments) {
478                        c.setUpdateLastAckStatement(s);
479                    }
480                }
481                if (xid != null) {
482                    byte[] xidVal = encodeXid(xid, seq, priority);
483                    setBinaryData(s, 1, xidVal);
484                } else {
485                    s.setLong(1, seq);
486                }
487                s.setString(2, destination.getQualifiedName());
488                s.setString(3, clientId);
489                s.setString(4, subscriptionName);
490    
491                if (this.batchStatments) {
492                    s.addBatch();
493                } else if (s.executeUpdate() != 1) {
494                    throw new IOException("Could not update last ack seq : "
495                                + seq + ", for sub: " + subscriptionName);
496                }
497            } finally {
498                cleanupExclusiveLock.readLock().unlock();
499                if (!this.batchStatments) {
500                    close(s);
501                }            
502            }
503        }
504    
505        private byte[] encodeXid(XATransactionId xid, long seq, long priority) {
506            byte[] xidVal = xid.getEncodedXidBytes();
507            // encode the update
508            DataByteArrayOutputStream outputStream = xid.internalOutputStream();
509            outputStream.position(1);
510            outputStream.writeLong(seq);
511            outputStream.writeByte(Long.valueOf(priority).byteValue());
512            return xidVal;
513        }
514    
515        @Override
516        public void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String clientId, String subName) throws SQLException, IOException {
517            PreparedStatement s = null;
518            cleanupExclusiveLock.readLock().lock();
519            try {
520                s = c.getConnection().prepareStatement(this.statements.getClearDurableLastAckInTxStatement());
521                s.setString(1, destination.getQualifiedName());
522                s.setString(2, clientId);
523                s.setString(3, subName);
524                s.setLong(4, priority);
525                if (s.executeUpdate() != 1) {
526                    throw new IOException("Could not remove prepared transaction state from message ack for: " + clientId + ":" + subName);
527                }
528            } finally {
529                cleanupExclusiveLock.readLock().unlock();
530                close(s);
531            }
532        }
533    
534        public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
535                String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception {
536            // dumpTables(c,
537            // destination.getQualifiedName(),clientId,subscriptionName);
538            PreparedStatement s = null;
539            ResultSet rs = null;
540            cleanupExclusiveLock.readLock().lock();
541            try {
542                s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement());
543                s.setString(1, destination.getQualifiedName());
544                s.setString(2, clientId);
545                s.setString(3, subscriptionName);
546                rs = s.executeQuery();
547                if (this.statements.isUseExternalMessageReferences()) {
548                    while (rs.next()) {
549                        if (!listener.recoverMessageReference(rs.getString(2))) {
550                            break;
551                        }
552                    }
553                } else {
554                    while (rs.next()) {
555                        if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
556                            break;
557                        }
558                    }
559                }
560            } finally {
561                cleanupExclusiveLock.readLock().unlock();
562                close(rs);
563                close(s);
564            }
565        }
566    
567        public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId,
568                String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
569            
570            PreparedStatement s = null;
571            ResultSet rs = null;
572            cleanupExclusiveLock.readLock().lock();
573            try {
574                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
575                s.setMaxRows(Math.max(maxReturned * 2, maxRows));
576                s.setString(1, destination.getQualifiedName());
577                s.setString(2, clientId);
578                s.setString(3, subscriptionName);
579                s.setLong(4, seq);
580                rs = s.executeQuery();
581                int count = 0;
582                if (this.statements.isUseExternalMessageReferences()) {
583                    while (rs.next() && count < maxReturned) {
584                        if (listener.recoverMessageReference(rs.getString(1))) {
585                            count++;
586                        }
587                    }
588                } else {
589                    while (rs.next() && count < maxReturned) {
590                        if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
591                            count++;
592                        }
593                    }
594                }
595            } finally {
596                cleanupExclusiveLock.readLock().unlock();
597                close(rs);
598                close(s);
599            }
600        }
601    
602        public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
603                String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
604    
605            PreparedStatement s = null;
606            ResultSet rs = null;
607            cleanupExclusiveLock.readLock().lock();
608            try {
609                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
610                s.setMaxRows(Math.max(maxReturned * 2, maxRows));
611                s.setString(1, destination.getQualifiedName());
612                s.setString(2, clientId);
613                s.setString(3, subscriptionName);
614                s.setLong(4, seq);
615                s.setLong(5, priority);
616                rs = s.executeQuery();
617                int count = 0;
618                if (this.statements.isUseExternalMessageReferences()) {
619                    while (rs.next() && count < maxReturned) {
620                        if (listener.recoverMessageReference(rs.getString(1))) {
621                            count++;
622                        }
623                    }
624                } else {
625                    while (rs.next() && count < maxReturned) {
626                        if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
627                            count++;
628                        }
629                    }
630                }
631            } finally {
632                cleanupExclusiveLock.readLock().unlock();
633                close(rs);
634                close(s);
635            }
636        }
637    
638        public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
639                String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
640            PreparedStatement s = null;
641            ResultSet rs = null;
642            int result = 0;
643            cleanupExclusiveLock.readLock().lock();
644            try {
645                if (isPrioritizedMessages) {
646                    s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
647                } else {
648                    s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());    
649                }
650                s.setString(1, destination.getQualifiedName());
651                s.setString(2, clientId);
652                s.setString(3, subscriptionName);
653                rs = s.executeQuery();
654                if (rs.next()) {
655                    result = rs.getInt(1);
656                }
657            } finally {
658                cleanupExclusiveLock.readLock().unlock();
659                close(rs);
660                close(s);
661            }
662            return result;
663        }
664    
665        /**
666         * @param c 
667         * @param info 
668         * @param retroactive 
669         * @throws SQLException 
670         * @throws IOException 
671         */
672        public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
673                throws SQLException, IOException {
674            // dumpTables(c, destination.getQualifiedName(), clientId,
675            // subscriptionName);
676            PreparedStatement s = null;
677            cleanupExclusiveLock.readLock().lock();
678            try {
679                long lastMessageId = -1;
680                if (!retroactive) {
681                    s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement());
682                    ResultSet rs = null;
683                    try {
684                        rs = s.executeQuery();
685                        if (rs.next()) {
686                            lastMessageId = rs.getLong(1);
687                        }
688                    } finally {
689                        close(rs);
690                        close(s);
691                    }
692                }
693                s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
694                int maxPriority = 1;
695                if (isPrioritizedMessages) {
696                    maxPriority = 10;
697                }
698    
699                for (int priority = 0; priority < maxPriority; priority++) {
700                    s.setString(1, info.getDestination().getQualifiedName());
701                    s.setString(2, info.getClientId());
702                    s.setString(3, info.getSubscriptionName());
703                    s.setString(4, info.getSelector());
704                    s.setLong(5, lastMessageId);
705                    s.setString(6, info.getSubscribedDestination().getQualifiedName());
706                    s.setLong(7, priority);
707    
708                    if (s.executeUpdate() != 1) {
709                        throw new IOException("Could not create durable subscription for: " + info.getClientId());
710                    }
711                }
712    
713            } finally {
714                cleanupExclusiveLock.readLock().unlock();
715                close(s);
716            }
717        }
718    
719        public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination,
720                String clientId, String subscriptionName) throws SQLException, IOException {
721            PreparedStatement s = null;
722            ResultSet rs = null;
723            cleanupExclusiveLock.readLock().lock();
724            try {
725                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement());
726                s.setString(1, destination.getQualifiedName());
727                s.setString(2, clientId);
728                s.setString(3, subscriptionName);
729                rs = s.executeQuery();
730                if (!rs.next()) {
731                    return null;
732                }
733                SubscriptionInfo subscription = new SubscriptionInfo();
734                subscription.setDestination(destination);
735                subscription.setClientId(clientId);
736                subscription.setSubscriptionName(subscriptionName);
737                subscription.setSelector(rs.getString(1));
738                subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2),
739                        ActiveMQDestination.QUEUE_TYPE));
740                return subscription;
741            } finally {
742                cleanupExclusiveLock.readLock().unlock();
743                close(rs);
744                close(s);
745            }
746        }
747    
748        public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
749                throws SQLException, IOException {
750            PreparedStatement s = null;
751            ResultSet rs = null;
752            cleanupExclusiveLock.readLock().lock();
753            try {
754                s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement());
755                s.setString(1, destination.getQualifiedName());
756                rs = s.executeQuery();
757                ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>();
758                while (rs.next()) {
759                    SubscriptionInfo subscription = new SubscriptionInfo();
760                    subscription.setDestination(destination);
761                    subscription.setSelector(rs.getString(1));
762                    subscription.setSubscriptionName(rs.getString(2));
763                    subscription.setClientId(rs.getString(3));
764                    subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4),
765                            ActiveMQDestination.QUEUE_TYPE));
766                    rc.add(subscription);
767                }
768                return rc.toArray(new SubscriptionInfo[rc.size()]);
769            } finally {
770                cleanupExclusiveLock.readLock().unlock();
771                close(rs);
772                close(s);
773            }
774        }
775    
776        public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException,
777                IOException {
778            PreparedStatement s = null;
779            cleanupExclusiveLock.readLock().lock();
780            try {
781                s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement());
782                s.setString(1, destinationName.getQualifiedName());
783                s.executeUpdate();
784                s.close();
785                s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement());
786                s.setString(1, destinationName.getQualifiedName());
787                s.executeUpdate();
788            } finally {
789                cleanupExclusiveLock.readLock().unlock();
790                close(s);
791            }
792        }
793    
794        public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId,
795                String subscriptionName) throws SQLException, IOException {
796            PreparedStatement s = null;
797            cleanupExclusiveLock.readLock().lock();
798            try {
799                s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement());
800                s.setString(1, destination.getQualifiedName());
801                s.setString(2, clientId);
802                s.setString(3, subscriptionName);
803                s.executeUpdate();
804            } finally {
805                cleanupExclusiveLock.readLock().unlock();
806                close(s);
807            }
808        }
809    
810        char priorityIterator = 0; // unsigned
811        public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
812            PreparedStatement s = null;
813            cleanupExclusiveLock.writeLock().lock();
814            try {
815                LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
816                s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
817                int priority = priorityIterator++%10;
818                s.setInt(1, priority);
819                s.setInt(2, priority);
820                int i = s.executeUpdate();
821                LOG.debug("Deleted " + i + " old message(s) at priority: " + priority);
822            } finally {
823                cleanupExclusiveLock.writeLock().unlock();
824                close(s);
825            }
826        }
827    
828        public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination,
829                String clientId, String subscriberName) throws SQLException, IOException {
830            PreparedStatement s = null;
831            ResultSet rs = null;
832            long result = -1;
833            cleanupExclusiveLock.readLock().lock();
834            try {
835                s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
836                s.setString(1, destination.getQualifiedName());
837                s.setString(2, clientId);
838                s.setString(3, subscriberName);
839                rs = s.executeQuery();
840                if (rs.next()) {
841                    result = rs.getLong(1);
842                    if (result == 0 && rs.wasNull()) {
843                        result = -1;
844                    }
845                }
846            } finally {
847                cleanupExclusiveLock.readLock().unlock();
848                close(rs);
849                close(s);
850            }
851            return result;
852        }
853    
854        protected static void close(PreparedStatement s) {
855            try {
856                s.close();
857            } catch (Throwable e) {
858            }
859        }
860    
861        protected static void close(ResultSet rs) {
862            try {
863                rs.close();
864            } catch (Throwable e) {
865            }
866        }
867    
868        public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException {
869            HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
870            PreparedStatement s = null;
871            ResultSet rs = null;
872            cleanupExclusiveLock.readLock().lock();
873            try {
874                s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement());
875                rs = s.executeQuery();
876                while (rs.next()) {
877                    rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE));
878                }
879            } finally {
880                cleanupExclusiveLock.readLock().unlock();
881                close(rs);
882                close(s);
883            }
884            return rc;
885        }
886    
887        /**
888         * @return true if batchStements
889         */
890        public boolean isBatchStatments() {
891            return this.batchStatments;
892        }
893    
894        /**
895         * @param batchStatments
896         */
897        public void setBatchStatments(boolean batchStatments) {
898            this.batchStatments = batchStatments;
899        }
900    
901        public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
902            this.statements.setUseExternalMessageReferences(useExternalMessageReferences);
903        }
904    
905        /**
906         * @return the statements
907         */
908        public Statements getStatements() {
909            return this.statements;
910        }
911    
912        public void setStatements(Statements statements) {
913            this.statements = statements;
914        }
915    
916        public int getMaxRows() {
917            return maxRows;
918        }
919    
920        public void setMaxRows(int maxRows) {
921            this.maxRows = maxRows;
922        }
923    
924        @Override
925        public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
926            PreparedStatement s = null;
927            cleanupExclusiveLock.readLock().lock();
928            try {
929                s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
930                s.setString(1, destination.getQualifiedName());
931                s.setString(2, destination.getQualifiedName());
932                s.setString(3, destination.getQualifiedName());
933                s.setString(4, null);
934                s.setLong(5, 0);
935                s.setString(6, destination.getQualifiedName());
936                s.setLong(7, 11);  // entry out of priority range
937    
938                if (s.executeUpdate() != 1) {
939                    throw new IOException("Could not create ack record for destination: " + destination);
940                }
941            } finally {
942                cleanupExclusiveLock.readLock().unlock();
943                close(s);
944            }
945        }
946    
947        @Override
948        public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException {
949            PreparedStatement s = null;
950            ResultSet rs = null;
951            cleanupExclusiveLock.readLock().lock();
952            try {
953                s = c.getConnection().prepareStatement(this.statements.getFindOpsPendingOutcomeStatement());
954                rs = s.executeQuery();
955                while (rs.next()) {
956                    long id = rs.getLong(1);
957                    byte[] encodedXid = getBinaryData(rs, 2);
958                    if (encodedXid[0] == '+') {
959                        jdbcMemoryTransactionStore.recoverAdd(id, getBinaryData(rs, 3));
960                    } else {
961                        jdbcMemoryTransactionStore.recoverAck(id, encodedXid, getBinaryData(rs, 3));
962                    }
963                }
964    
965                close(rs);
966                close(s);
967    
968                s = c.getConnection().prepareStatement(this.statements.getFindAcksPendingOutcomeStatement());
969                rs = s.executeQuery();
970                while (rs.next()) {
971                    byte[] encodedXid = getBinaryData(rs, 1);
972                    String destination = rs.getString(2);
973                    String subName = rs.getString(3);
974                    String subId = rs.getString(4);
975                    jdbcMemoryTransactionStore.recoverLastAck(encodedXid,
976                            ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE),
977                            subName, subId);
978                }
979            } finally {
980                close(rs);
981                cleanupExclusiveLock.readLock().unlock();
982                close(s);
983            }
984        }
985    
986        @Override
987        public void doCommitAddOp(TransactionContext c, long sequence) throws SQLException, IOException {
988            PreparedStatement s = null;
989            cleanupExclusiveLock.readLock().lock();
990            try {
991                s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement());
992                s.setLong(1, sequence);
993                if (s.executeUpdate() != 1) {
994                    throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence);
995                }
996            } finally {
997                cleanupExclusiveLock.readLock().unlock();
998                close(s);
999            }
1000        }
1001    
1002    
1003        public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
1004                IOException {
1005            PreparedStatement s = null;
1006            ResultSet rs = null;
1007            int result = 0;
1008            cleanupExclusiveLock.readLock().lock();
1009            try {
1010                s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement());
1011                s.setString(1, destination.getQualifiedName());
1012                rs = s.executeQuery();
1013                if (rs.next()) {
1014                    result = rs.getInt(1);
1015                }
1016            } finally {
1017                cleanupExclusiveLock.readLock().unlock();
1018                close(rs);
1019                close(s);
1020            }
1021            return result;
1022        }
1023    
1024        public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
1025                long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
1026            PreparedStatement s = null;
1027            ResultSet rs = null;
1028            cleanupExclusiveLock.readLock().lock();
1029            try {
1030                if (isPrioritizedMessages) {
1031                    s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
1032                } else {
1033                    s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
1034                }
1035                s.setMaxRows(Math.max(maxReturned * 2, maxRows));
1036                s.setString(1, destination.getQualifiedName());
1037                s.setLong(2, nextSeq);
1038                if (isPrioritizedMessages) {
1039                    s.setLong(3, priority);
1040                    s.setLong(4, priority);
1041                }
1042                rs = s.executeQuery();
1043                int count = 0;
1044                if (this.statements.isUseExternalMessageReferences()) {
1045                    while (rs.next() && count < maxReturned) {
1046                        if (listener.recoverMessageReference(rs.getString(1))) {
1047                            count++;
1048                        } else {
1049                            LOG.debug("Stopped recover next messages");
1050                            break;
1051                        }
1052                    }
1053                } else {
1054                    while (rs.next() && count < maxReturned) {
1055                        if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
1056                            count++;
1057                        } else {
1058                            LOG.debug("Stopped recover next messages");
1059                            break;
1060                        }
1061                    }
1062                }
1063            } catch (Exception e) {
1064                e.printStackTrace();
1065            } finally {
1066                cleanupExclusiveLock.readLock().unlock();
1067                close(rs);
1068                close(s);
1069            }
1070        }
1071    
1072        public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id)
1073                throws SQLException, IOException {
1074            PreparedStatement s = null;
1075            ResultSet rs = null;
1076            cleanupExclusiveLock.readLock().lock();
1077            try {
1078                s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement());
1079                s.setString(1, id.toString());
1080                rs = s.executeQuery();
1081                long seq = -1;
1082                if (rs.next()) {
1083                    seq = rs.getLong(1);
1084                }
1085                return seq;
1086            } finally {
1087                cleanupExclusiveLock.readLock().unlock();
1088                close(rs);
1089                close(s);
1090            }
1091        }
1092    
1093    /*    public static void dumpTables(Connection c, String destinationName, String clientId, String
1094          subscriptionName) throws SQLException { 
1095            printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 
1096            printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 
1097            PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " 
1098                    + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " 
1099                    + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 
1100                    + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 
1101                    + " ORDER BY M.ID");
1102          s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
1103          printQuery(s,System.out); }
1104    
1105        public static void dumpTables(java.sql.Connection c) throws SQLException {
1106            printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out);
1107            printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
1108        }
1109    
1110        private static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
1111                throws SQLException {
1112            printQuery(c.prepareStatement(query), out);
1113        }
1114    
1115        private static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
1116                throws SQLException {
1117    
1118            ResultSet set = null;
1119            try {
1120                set = s.executeQuery();
1121                java.sql.ResultSetMetaData metaData = set.getMetaData();
1122                for (int i = 1; i <= metaData.getColumnCount(); i++) {
1123                    if (i == 1)
1124                        out.print("||");
1125                    out.print(metaData.getColumnName(i) + "||");
1126                }
1127                out.println();
1128                while (set.next()) {
1129                    for (int i = 1; i <= metaData.getColumnCount(); i++) {
1130                        if (i == 1)
1131                            out.print("|");
1132                        out.print(set.getString(i) + "|");
1133                    }
1134                    out.println();
1135                }
1136            } finally {
1137                try {
1138                    set.close();
1139                } catch (Throwable ignore) {
1140                }
1141                try {
1142                    s.close();
1143                } catch (Throwable ignore) {
1144                }
1145            }
1146        }  */
1147    
1148    }