001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.jdbc;
018
019import java.io.File;
020import java.io.IOException;
021import java.sql.Connection;
022import java.sql.SQLException;
023import java.util.Collections;
024import java.util.Locale;
025import java.util.Set;
026import java.util.concurrent.ScheduledFuture;
027import java.util.concurrent.ScheduledThreadPoolExecutor;
028import java.util.concurrent.ThreadFactory;
029import java.util.concurrent.TimeUnit;
030
031import javax.sql.DataSource;
032
033import org.apache.activemq.ActiveMQMessageAudit;
034import org.apache.activemq.broker.BrokerService;
035import org.apache.activemq.broker.ConnectionContext;
036import org.apache.activemq.broker.Locker;
037import org.apache.activemq.broker.scheduler.JobSchedulerStore;
038import org.apache.activemq.command.ActiveMQDestination;
039import org.apache.activemq.command.ActiveMQQueue;
040import org.apache.activemq.command.ActiveMQTopic;
041import org.apache.activemq.command.Message;
042import org.apache.activemq.command.MessageAck;
043import org.apache.activemq.command.MessageId;
044import org.apache.activemq.command.ProducerId;
045import org.apache.activemq.openwire.OpenWireFormat;
046import org.apache.activemq.store.MessageStore;
047import org.apache.activemq.store.PersistenceAdapter;
048import org.apache.activemq.store.TopicMessageStore;
049import org.apache.activemq.store.TransactionStore;
050import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
051import org.apache.activemq.store.memory.MemoryTransactionStore;
052import org.apache.activemq.usage.SystemUsage;
053import org.apache.activemq.util.ByteSequence;
054import org.apache.activemq.util.FactoryFinder;
055import org.apache.activemq.util.IOExceptionSupport;
056import org.apache.activemq.util.LongSequenceGenerator;
057import org.apache.activemq.util.ServiceStopper;
058import org.apache.activemq.util.ThreadPoolUtils;
059import org.apache.activemq.wireformat.WireFormat;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063/**
064 * A {@link PersistenceAdapter} implementation using JDBC for persistence
065 * storage.
066 *
067 * This persistence adapter will correctly remember prepared XA transactions,
068 * but it will not keep track of local transaction commits so that operations
069 * performed against the Message store are done as a single uow.
070 *
071 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
072 *
073 */
074public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter {
075
076    private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
077    private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
078        "META-INF/services/org/apache/activemq/store/jdbc/");
079    private static FactoryFinder lockFactoryFinder = new FactoryFinder(
080        "META-INF/services/org/apache/activemq/store/jdbc/lock/");
081
082    public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000;
083
084    private WireFormat wireFormat = new OpenWireFormat();
085    private Statements statements;
086    private JDBCAdapter adapter;
087    private MemoryTransactionStore transactionStore;
088    private ScheduledFuture<?> cleanupTicket;
089    private int cleanupPeriod = 1000 * 60 * 5;
090    private boolean useExternalMessageReferences;
091    private boolean createTablesOnStartup = true;
092    private DataSource lockDataSource;
093    private int transactionIsolation;
094    private File directory;
095    private boolean changeAutoCommitAllowed = true;
096
097    protected int maxProducersToAudit=1024;
098    protected int maxAuditDepth=1000;
099    protected boolean enableAudit=false;
100    protected int auditRecoveryDepth = 1024;
101    protected ActiveMQMessageAudit audit;
102
103    protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
104    protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
105
106    {
107        setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD);
108    }
109
110    public JDBCPersistenceAdapter() {
111    }
112
113    public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
114        super(ds);
115        this.wireFormat = wireFormat;
116    }
117
118    @Override
119    public Set<ActiveMQDestination> getDestinations() {
120        TransactionContext c = null;
121        try {
122            c = getTransactionContext();
123            return getAdapter().doGetDestinations(c);
124        } catch (IOException e) {
125            return emptyDestinationSet();
126        } catch (SQLException e) {
127            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
128            return emptyDestinationSet();
129        } finally {
130            if (c != null) {
131                try {
132                    c.close();
133                } catch (Throwable e) {
134                }
135            }
136        }
137    }
138
139    @SuppressWarnings("unchecked")
140    private Set<ActiveMQDestination> emptyDestinationSet() {
141        return Collections.EMPTY_SET;
142    }
143
144    protected void createMessageAudit() {
145        if (enableAudit && audit == null) {
146            audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
147            TransactionContext c = null;
148
149            try {
150                c = getTransactionContext();
151                getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
152                    @Override
153                    public void messageId(MessageId id) {
154                        audit.isDuplicate(id);
155                    }
156                });
157            } catch (Exception e) {
158                LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
159            } finally {
160                if (c != null) {
161                    try {
162                        c.close();
163                    } catch (Throwable e) {
164                    }
165                }
166            }
167        }
168    }
169
170    public void initSequenceIdGenerator() {
171        TransactionContext c = null;
172        try {
173            c = getTransactionContext();
174            getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
175                @Override
176                public void messageId(MessageId id) {
177                    audit.isDuplicate(id);
178                }
179            });
180        } catch (Exception e) {
181            LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
182        } finally {
183            if (c != null) {
184                try {
185                    c.close();
186                } catch (Throwable e) {
187                }
188            }
189        }
190    }
191
192    @Override
193    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
194        MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
195        if (transactionStore != null) {
196            rc = transactionStore.proxy(rc);
197        }
198        return rc;
199    }
200
201    @Override
202    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
203        TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
204        if (transactionStore != null) {
205            rc = transactionStore.proxy(rc);
206        }
207        return rc;
208    }
209
210    /**
211     * Cleanup method to remove any state associated with the given destination
212     * @param destination Destination to forget
213     */
214    @Override
215    public void removeQueueMessageStore(ActiveMQQueue destination) {
216        if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) {
217            try {
218                removeConsumerDestination(destination);
219            } catch (IOException ioe) {
220                LOG.error("Failed to remove consumer destination: " + destination, ioe);
221            }
222        }
223    }
224
225    private void removeConsumerDestination(ActiveMQQueue destination) throws IOException {
226        TransactionContext c = getTransactionContext();
227        try {
228            String id = destination.getQualifiedName();
229            getAdapter().doDeleteSubscription(c, destination, id, id);
230        } catch (SQLException e) {
231            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
232            throw IOExceptionSupport.create("Failed to remove consumer destination: " + destination, e);
233        } finally {
234            c.close();
235        }
236    }
237
238    /**
239     * Cleanup method to remove any state associated with the given destination
240     * No state retained.... nothing to do
241     *
242     * @param destination Destination to forget
243     */
244    @Override
245    public void removeTopicMessageStore(ActiveMQTopic destination) {
246    }
247
248    @Override
249    public TransactionStore createTransactionStore() throws IOException {
250        if (transactionStore == null) {
251            transactionStore = new JdbcMemoryTransactionStore(this);
252        }
253        return this.transactionStore;
254    }
255
256    @Override
257    public long getLastMessageBrokerSequenceId() throws IOException {
258        TransactionContext c = getTransactionContext();
259        try {
260            long seq =  getAdapter().doGetLastMessageStoreSequenceId(c);
261            sequenceGenerator.setLastSequenceId(seq);
262            long brokerSeq = 0;
263            if (seq != 0) {
264                byte[] msg = getAdapter().doGetMessageById(c, seq);
265                if (msg != null) {
266                    Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
267                    brokerSeq = last.getMessageId().getBrokerSequenceId();
268                } else {
269                   LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
270                }
271            }
272            return brokerSeq;
273        } catch (SQLException e) {
274            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
275            throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
276        } finally {
277            c.close();
278        }
279    }
280
281    @Override
282    public long getLastProducerSequenceId(ProducerId id) throws IOException {
283        TransactionContext c = getTransactionContext();
284        try {
285            return getAdapter().doGetLastProducerSequenceId(c, id);
286        } catch (SQLException e) {
287            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
288            throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
289        } finally {
290            c.close();
291        }
292    }
293
294    @Override
295    public void allowIOResumption() {}
296
297    @Override
298    public void init() throws Exception {
299        getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
300
301        if (isCreateTablesOnStartup()) {
302            TransactionContext transactionContext = getTransactionContext();
303            transactionContext.getExclusiveConnection();
304            transactionContext.begin();
305            try {
306                try {
307                    getAdapter().doCreateTables(transactionContext);
308                } catch (SQLException e) {
309                    LOG.warn("Cannot create tables due to: " + e);
310                    JDBCPersistenceAdapter.log("Failure Details: ", e);
311                }
312            } finally {
313                transactionContext.commit();
314            }
315        }
316    }
317
318    @Override
319    public void doStart() throws Exception {
320
321        if( brokerService!=null ) {
322          wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
323        }
324
325        // Cleanup the db periodically.
326        if (cleanupPeriod > 0) {
327            cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
328                @Override
329                public void run() {
330                    cleanup();
331                }
332            }, 0, cleanupPeriod, TimeUnit.MILLISECONDS);
333        }
334        createMessageAudit();
335    }
336
337    @Override
338    public synchronized void doStop(ServiceStopper stopper) throws Exception {
339        if (cleanupTicket != null) {
340            cleanupTicket.cancel(true);
341            cleanupTicket = null;
342        }
343        closeDataSource(getDataSource());
344    }
345
346    public void cleanup() {
347        TransactionContext c = null;
348        try {
349            LOG.debug("Cleaning up old messages.");
350            c = getTransactionContext();
351            c.getExclusiveConnection();
352            getAdapter().doDeleteOldMessages(c);
353        } catch (IOException e) {
354            LOG.warn("Old message cleanup failed due to: " + e, e);
355        } catch (SQLException e) {
356            LOG.warn("Old message cleanup failed due to: " + e);
357            JDBCPersistenceAdapter.log("Failure Details: ", e);
358        } finally {
359            if (c != null) {
360                try {
361                    c.close();
362                } catch (Throwable e) {
363                }
364            }
365            LOG.debug("Cleanup done.");
366        }
367    }
368
369    @Override
370    public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
371        if (clockDaemon == null) {
372            clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
373                @Override
374                public Thread newThread(Runnable runnable) {
375                    Thread thread = new Thread(runnable, "ActiveMQ JDBC PA Scheduled Task");
376                    thread.setDaemon(true);
377                    return thread;
378                }
379            });
380        }
381        return clockDaemon;
382    }
383
384    public JDBCAdapter getAdapter() throws IOException {
385        if (adapter == null) {
386            setAdapter(createAdapter());
387        }
388        return adapter;
389    }
390
391    /**
392     * @deprecated as of 5.7.0, replaced by {@link #getLocker()}
393     */
394    @Deprecated
395    public Locker getDatabaseLocker() throws IOException {
396        return getLocker();
397    }
398
399    /**
400     * Sets the database locker strategy to use to lock the database on startup
401     * @throws IOException
402     *
403     * @deprecated as of 5.7.0, replaced by {@link #setLocker(org.apache.activemq.broker.Locker)}
404     */
405    @Deprecated
406    public void setDatabaseLocker(Locker locker) throws IOException {
407        setLocker(locker);
408    }
409
410    public DataSource getLockDataSource() throws IOException {
411        if (lockDataSource == null) {
412            lockDataSource = getDataSource();
413            if (lockDataSource == null) {
414                throw new IllegalArgumentException(
415                        "No dataSource property has been configured");
416            }
417        }
418        return lockDataSource;
419    }
420
421    public void setLockDataSource(DataSource dataSource) {
422        this.lockDataSource = dataSource;
423        LOG.info("Using a separate dataSource for locking: "
424                            + lockDataSource);
425    }
426
427    @Override
428    public BrokerService getBrokerService() {
429        return brokerService;
430    }
431
432    /**
433     * @throws IOException
434     */
435    protected JDBCAdapter createAdapter() throws IOException {
436
437        adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
438
439        // Use the default JDBC adapter if the
440        // Database type is not recognized.
441        if (adapter == null) {
442            adapter = new DefaultJDBCAdapter();
443            LOG.debug("Using default JDBC Adapter: " + adapter);
444        }
445        return adapter;
446    }
447
448    private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
449        Object adapter = null;
450        TransactionContext c = getTransactionContext();
451        try {
452            try {
453                // Make the filename file system safe.
454                String dirverName = c.getConnection().getMetaData().getDriverName();
455                dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(Locale.ENGLISH);
456
457                try {
458                    adapter = finder.newInstance(dirverName);
459                    LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
460                } catch (Throwable e) {
461                    LOG.info("Database " + kind + " driver override not found for : [" + dirverName
462                             + "].  Will use default implementation.");
463                }
464            } catch (SQLException e) {
465                LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
466                          + e.getMessage());
467                JDBCPersistenceAdapter.log("Failure Details: ", e);
468            }
469        } finally {
470            c.close();
471        }
472        return adapter;
473    }
474
475    public void setAdapter(JDBCAdapter adapter) {
476        this.adapter = adapter;
477        this.adapter.setStatements(getStatements());
478        this.adapter.setMaxRows(getMaxRows());
479    }
480
481    public WireFormat getWireFormat() {
482        return wireFormat;
483    }
484
485    public void setWireFormat(WireFormat wireFormat) {
486        this.wireFormat = wireFormat;
487    }
488
489    public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
490        if (context == null || isBrokerContext(context)) {
491            return getTransactionContext();
492        } else {
493            TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
494            if (answer == null) {
495                answer = getTransactionContext();
496                context.setLongTermStoreContext(answer);
497            }
498            return answer;
499        }
500    }
501
502    private boolean isBrokerContext(ConnectionContext context) {
503        return context.getSecurityContext() != null && context.getSecurityContext().isBrokerContext();
504    }
505
506    public TransactionContext getTransactionContext() throws IOException {
507        TransactionContext answer = new TransactionContext(this);
508        if (transactionIsolation > 0) {
509            answer.setTransactionIsolation(transactionIsolation);
510        }
511        return answer;
512    }
513
514    @Override
515    public void beginTransaction(ConnectionContext context) throws IOException {
516        TransactionContext transactionContext = getTransactionContext(context);
517        transactionContext.begin();
518    }
519
520    @Override
521    public void commitTransaction(ConnectionContext context) throws IOException {
522        TransactionContext transactionContext = getTransactionContext(context);
523        transactionContext.commit();
524    }
525
526    @Override
527    public void rollbackTransaction(ConnectionContext context) throws IOException {
528        TransactionContext transactionContext = getTransactionContext(context);
529        transactionContext.rollback();
530    }
531
532    public int getCleanupPeriod() {
533        return cleanupPeriod;
534    }
535
536    /**
537     * Sets the number of milliseconds until the database is attempted to be
538     * cleaned up for durable topics
539     */
540    public void setCleanupPeriod(int cleanupPeriod) {
541        this.cleanupPeriod = cleanupPeriod;
542    }
543
544    public boolean isChangeAutoCommitAllowed() {
545        return changeAutoCommitAllowed;
546    }
547
548    /**
549     * Whether the JDBC driver allows to set the auto commit.
550     * Some drivers does not allow changing the auto commit. The default value is true.
551     *
552     * @param changeAutoCommitAllowed true to change, false to not change.
553     */
554    public void setChangeAutoCommitAllowed(boolean changeAutoCommitAllowed) {
555        this.changeAutoCommitAllowed = changeAutoCommitAllowed;
556    }
557
558    @Override
559    public void deleteAllMessages() throws IOException {
560        TransactionContext c = getTransactionContext();
561        c.getExclusiveConnection();
562        try {
563            getAdapter().doDropTables(c);
564            getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
565            getAdapter().doCreateTables(c);
566            LOG.info("Persistence store purged.");
567        } catch (SQLException e) {
568            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
569            throw IOExceptionSupport.create(e);
570        } finally {
571            c.close();
572        }
573    }
574
575    public boolean isUseExternalMessageReferences() {
576        return useExternalMessageReferences;
577    }
578
579    public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
580        this.useExternalMessageReferences = useExternalMessageReferences;
581    }
582
583    public boolean isCreateTablesOnStartup() {
584        return createTablesOnStartup;
585    }
586
587    /**
588     * Sets whether or not tables are created on startup
589     */
590    public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
591        this.createTablesOnStartup = createTablesOnStartup;
592    }
593
594    /**
595     * @deprecated use {@link #setUseLock(boolean)} instead
596     *
597     * Sets whether or not an exclusive database lock should be used to enable
598     * JDBC Master/Slave. Enabled by default.
599     */
600    @Deprecated
601    public void setUseDatabaseLock(boolean useDatabaseLock) {
602        setUseLock(useDatabaseLock);
603    }
604
605    public static void log(String msg, SQLException e) {
606        String s = msg + e.getMessage();
607        while (e.getNextException() != null) {
608            e = e.getNextException();
609            s += ", due to: " + e.getMessage();
610        }
611        LOG.warn(s, e);
612    }
613
614    public Statements getStatements() {
615        if (statements == null) {
616            statements = new Statements();
617        }
618        return statements;
619    }
620
621    public void setStatements(Statements statements) {
622        this.statements = statements;
623        if (adapter != null) {
624            this.adapter.setStatements(getStatements());
625        }
626    }
627
628    /**
629     * @param usageManager The UsageManager that is controlling the
630     *                destination's memory usage.
631     */
632    @Override
633    public void setUsageManager(SystemUsage usageManager) {
634    }
635
636    @Override
637    public Locker createDefaultLocker() throws IOException {
638        Locker locker = (Locker) loadAdapter(lockFactoryFinder, "lock");
639        if (locker == null) {
640            locker = new DefaultDatabaseLocker();
641            LOG.debug("Using default JDBC Locker: " + locker);
642        }
643        locker.configure(this);
644        return locker;
645    }
646
647    @Override
648    public void setBrokerName(String brokerName) {
649    }
650
651    @Override
652    public String toString() {
653        return "JDBCPersistenceAdapter(" + super.toString() + ")";
654    }
655
656    @Override
657    public void setDirectory(File dir) {
658        this.directory=dir;
659    }
660
661    @Override
662    public File getDirectory(){
663        if (this.directory==null && brokerService != null){
664            this.directory=brokerService.getBrokerDataDirectory();
665        }
666        return this.directory;
667    }
668
669    // interesting bit here is proof that DB is ok
670    @Override
671    public void checkpoint(boolean sync) throws IOException {
672        // by pass TransactionContext to avoid IO Exception handler
673        Connection connection = null;
674        try {
675            connection = getDataSource().getConnection();
676            if (!connection.isValid(10)) {
677                throw new IOException("isValid(10) failed for: " + connection);
678            }
679        } catch (SQLException e) {
680            LOG.debug("Could not get JDBC connection for checkpoint: " + e);
681            throw IOExceptionSupport.create(e);
682        } finally {
683            if (connection != null) {
684                try {
685                    connection.close();
686                } catch (Throwable ignored) {
687                }
688            }
689        }
690    }
691
692    @Override
693    public long size(){
694        return 0;
695    }
696
697    /**
698     * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
699     *
700     * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
701     * not applied if DataBaseLocker is injected.
702     *
703     */
704    @Deprecated
705    public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException {
706        getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval);
707    }
708
709    /**
710     * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
711     * This allowable dirty isolation level may not be achievable in clustered DB environments
712     * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
713     * see isolation level constants in {@link java.sql.Connection}
714     * @param transactionIsolation the isolation level to use
715     */
716    public void setTransactionIsolation(int transactionIsolation) {
717        this.transactionIsolation = transactionIsolation;
718    }
719
720    public int getMaxProducersToAudit() {
721        return maxProducersToAudit;
722    }
723
724    public void setMaxProducersToAudit(int maxProducersToAudit) {
725        this.maxProducersToAudit = maxProducersToAudit;
726    }
727
728    public int getMaxAuditDepth() {
729        return maxAuditDepth;
730    }
731
732    public void setMaxAuditDepth(int maxAuditDepth) {
733        this.maxAuditDepth = maxAuditDepth;
734    }
735
736    public boolean isEnableAudit() {
737        return enableAudit;
738    }
739
740    public void setEnableAudit(boolean enableAudit) {
741        this.enableAudit = enableAudit;
742    }
743
744    public int getAuditRecoveryDepth() {
745        return auditRecoveryDepth;
746    }
747
748    public void setAuditRecoveryDepth(int auditRecoveryDepth) {
749        this.auditRecoveryDepth = auditRecoveryDepth;
750    }
751
752    public long getNextSequenceId() {
753        return sequenceGenerator.getNextSequenceId();
754    }
755
756    public int getMaxRows() {
757        return maxRows;
758    }
759
760    /*
761     * the max rows return from queries, with sparse selectors this may need to be increased
762     */
763    public void setMaxRows(int maxRows) {
764        this.maxRows = maxRows;
765    }
766
767    public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws IOException {
768        TransactionContext c = getTransactionContext();
769        try {
770            getAdapter().doRecoverPreparedOps(c, jdbcMemoryTransactionStore);
771        } catch (SQLException e) {
772            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
773            throw IOExceptionSupport.create("Failed to recover from: " + jdbcMemoryTransactionStore + ". Reason: " + e,e);
774        } finally {
775            c.close();
776        }
777    }
778
779    public void commitAdd(ConnectionContext context, MessageId messageId, long preparedSequenceId) throws IOException {
780        TransactionContext c = getTransactionContext(context);
781        try {
782            long sequence = (Long)messageId.getEntryLocator();
783            getAdapter().doCommitAddOp(c, preparedSequenceId, sequence);
784        } catch (SQLException e) {
785            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
786            throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e);
787        } finally {
788            c.close();
789        }
790    }
791
792    public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException {
793        TransactionContext c = getTransactionContext(context);
794        try {
795            getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getFutureOrSequenceLong(), null);
796        } catch (SQLException e) {
797            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
798            throw IOExceptionSupport.create("Failed to commit last ack: " + ack + ". Reason: " + e,e);
799        } finally {
800            c.close();
801        }
802    }
803
804    public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
805        TransactionContext c = getTransactionContext(context);
806        try {
807            getAdapter().doSetLastAck(c, destination, null, clientId, subName, xidLastAck, priority);
808        } catch (SQLException e) {
809            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
810            throw IOExceptionSupport.create("Failed to commit last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e,e);
811        } finally {
812            c.close();
813        }
814    }
815
816    public void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId) throws IOException {
817        TransactionContext c = getTransactionContext(context);
818        try {
819            byte priority = (byte) store.getCachedStoreSequenceId(c, store.getDestination(), ack.getLastMessageId())[1];
820            getAdapter().doClearLastAck(c, store.getDestination(), priority, clientId, subName);
821        } catch (SQLException e) {
822            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
823            throw IOExceptionSupport.create("Failed to rollback last ack: " + ack + " on " +  store.getDestination() + " for " + subName + ":" + clientId + ". Reason: " + e,e);
824        } finally {
825            c.close();
826        }
827    }
828
829    // after recovery there is no record of the original messageId for the ack
830    public void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
831        TransactionContext c = getTransactionContext(context);
832        try {
833            getAdapter().doClearLastAck(c, destination, priority, clientId, subName);
834        } catch (SQLException e) {
835            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
836            throw IOExceptionSupport.create("Failed to rollback last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e, e);
837        } finally {
838            c.close();
839        }
840    }
841
842    long[] getStoreSequenceIdForMessageId(ConnectionContext context, MessageId messageId, ActiveMQDestination destination) throws IOException {
843        long[] result = new long[]{-1, Byte.MAX_VALUE -1};
844        TransactionContext c = getTransactionContext(context);
845        try {
846            result = adapter.getStoreSequenceId(c, destination, messageId);
847        } catch (SQLException e) {
848            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
849            throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
850        } finally {
851            c.close();
852        }
853        return result;
854    }
855
856    @Override
857    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
858        throw new UnsupportedOperationException();
859    }
860}