001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.sql.Connection;
022    import java.sql.SQLException;
023    import java.util.Collections;
024    import java.util.Locale;
025    import java.util.Set;
026    import java.util.concurrent.ScheduledFuture;
027    import java.util.concurrent.ScheduledThreadPoolExecutor;
028    import java.util.concurrent.ThreadFactory;
029    import java.util.concurrent.TimeUnit;
030    
031    import javax.sql.DataSource;
032    
033    import org.apache.activemq.ActiveMQMessageAudit;
034    import org.apache.activemq.broker.BrokerService;
035    import org.apache.activemq.broker.BrokerServiceAware;
036    import org.apache.activemq.broker.ConnectionContext;
037    import org.apache.activemq.command.ActiveMQDestination;
038    import org.apache.activemq.command.ActiveMQQueue;
039    import org.apache.activemq.command.ActiveMQTopic;
040    import org.apache.activemq.command.Message;
041    import org.apache.activemq.command.MessageAck;
042    import org.apache.activemq.command.MessageId;
043    import org.apache.activemq.command.ProducerId;
044    import org.apache.activemq.openwire.OpenWireFormat;
045    import org.apache.activemq.broker.Locker;
046    import org.apache.activemq.store.MessageStore;
047    import org.apache.activemq.store.PersistenceAdapter;
048    import org.apache.activemq.store.TopicMessageStore;
049    import org.apache.activemq.store.TransactionStore;
050    import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
051    import org.apache.activemq.store.memory.MemoryTransactionStore;
052    import org.apache.activemq.usage.SystemUsage;
053    import org.apache.activemq.util.ByteSequence;
054    import org.apache.activemq.util.FactoryFinder;
055    import org.apache.activemq.util.IOExceptionSupport;
056    import org.apache.activemq.util.LongSequenceGenerator;
057    import org.apache.activemq.util.ServiceStopper;
058    import org.apache.activemq.wireformat.WireFormat;
059    import org.slf4j.Logger;
060    import org.slf4j.LoggerFactory;
061    
062    /**
063     * A {@link PersistenceAdapter} implementation using JDBC for persistence
064     * storage.
065     * 
066     * This persistence adapter will correctly remember prepared XA transactions,
067     * but it will not keep track of local transaction commits so that operations
068     * performed against the Message store are done as a single uow.
069     * 
070     * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
071     * 
072     * 
073     */
074    public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter {
075    
076        private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
077        private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
078                                                                       "META-INF/services/org/apache/activemq/store/jdbc/");
079        private static FactoryFinder lockFactoryFinder = new FactoryFinder(
080                                                                        "META-INF/services/org/apache/activemq/store/jdbc/lock/");
081    
082        public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000;
083    
084        private WireFormat wireFormat = new OpenWireFormat();
085        private Statements statements;
086        private JDBCAdapter adapter;
087        private MemoryTransactionStore transactionStore;
088        private ScheduledThreadPoolExecutor clockDaemon;
089        private ScheduledFuture<?> cleanupTicket;
090        private int cleanupPeriod = 1000 * 60 * 5;
091        private boolean useExternalMessageReferences;
092        private boolean createTablesOnStartup = true;
093        private DataSource lockDataSource;
094        private int transactionIsolation;
095        private File directory;
096        private boolean changeAutoCommitAllowed = true;
097        
098        protected int maxProducersToAudit=1024;
099        protected int maxAuditDepth=1000;
100        protected boolean enableAudit=false;
101        protected int auditRecoveryDepth = 1024;
102        protected ActiveMQMessageAudit audit;
103        
104        protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
105        protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
106    
107        {
108            setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD);
109        }
110    
111        public JDBCPersistenceAdapter() {
112        }
113    
114        public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
115            super(ds);
116            this.wireFormat = wireFormat;
117        }
118    
119        public Set<ActiveMQDestination> getDestinations() {
120            TransactionContext c = null;
121            try {
122                c = getTransactionContext();
123                return getAdapter().doGetDestinations(c);
124            } catch (IOException e) {
125                return emptyDestinationSet();
126            } catch (SQLException e) {
127                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
128                return emptyDestinationSet();
129            } finally {
130                if (c != null) {
131                    try {
132                        c.close();
133                    } catch (Throwable e) {
134                    }
135                }
136            }
137        }
138    
139        @SuppressWarnings("unchecked")
140        private Set<ActiveMQDestination> emptyDestinationSet() {
141            return Collections.EMPTY_SET;
142        }
143        
144        protected void createMessageAudit() {
145            if (enableAudit && audit == null) {
146                audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
147                TransactionContext c = null;
148                
149                try {
150                    c = getTransactionContext();
151                    getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
152                        public void messageId(MessageId id) {
153                            audit.isDuplicate(id);
154                        }
155                    });
156                } catch (Exception e) {
157                    LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
158                } finally {
159                    if (c != null) {
160                        try {
161                            c.close();
162                        } catch (Throwable e) {
163                        }
164                    }
165                }
166            }
167        }
168        
169        public void initSequenceIdGenerator() {
170            TransactionContext c = null;
171            try {
172                c = getTransactionContext();
173                getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
174                    public void messageId(MessageId id) {
175                        audit.isDuplicate(id);
176                    }
177                });
178            } catch (Exception e) {
179                LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
180            } finally {
181                if (c != null) {
182                    try {
183                        c.close();
184                    } catch (Throwable e) {
185                    }
186                }
187            }
188            
189        }
190    
191        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
192            MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
193            if (transactionStore != null) {
194                rc = transactionStore.proxy(rc);
195            }
196            return rc;
197        }
198    
199        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
200            TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
201            if (transactionStore != null) {
202                rc = transactionStore.proxy(rc);
203            }
204            return rc;
205        }
206    
207        /**
208         * Cleanup method to remove any state associated with the given destination
209         * @param destination Destination to forget
210         */
211        public void removeQueueMessageStore(ActiveMQQueue destination) {
212            if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) {
213                try {
214                    removeConsumerDestination(destination);
215                } catch (IOException ioe) {
216                    LOG.error("Failed to remove consumer destination: " + destination, ioe);
217                }
218            }
219        }
220    
221        private void removeConsumerDestination(ActiveMQQueue destination) throws IOException {
222            TransactionContext c = getTransactionContext();
223            try {
224                String id = destination.getQualifiedName();
225                getAdapter().doDeleteSubscription(c, destination, id, id);
226            } catch (SQLException e) {
227                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
228                throw IOExceptionSupport.create("Failed to remove consumer destination: " + destination, e);
229            } finally {
230                c.close();
231            }
232        }
233    
234        /**
235         * Cleanup method to remove any state associated with the given destination
236         * No state retained.... nothing to do
237         *
238         * @param destination Destination to forget
239         */
240        public void removeTopicMessageStore(ActiveMQTopic destination) {
241        }
242    
243        public TransactionStore createTransactionStore() throws IOException {
244            if (transactionStore == null) {
245                transactionStore = new JdbcMemoryTransactionStore(this);
246            }
247            return this.transactionStore;
248        }
249    
250        public long getLastMessageBrokerSequenceId() throws IOException {
251            TransactionContext c = getTransactionContext();
252            try {
253                long seq =  getAdapter().doGetLastMessageStoreSequenceId(c);
254                sequenceGenerator.setLastSequenceId(seq);
255                long brokerSeq = 0;
256                if (seq != 0) {
257                    byte[] msg = getAdapter().doGetMessageById(c, seq);
258                    if (msg != null) {
259                        Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
260                        brokerSeq = last.getMessageId().getBrokerSequenceId();
261                    } else {
262                       LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
263                    }
264                }
265                return brokerSeq;
266            } catch (SQLException e) {
267                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
268                throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
269            } finally {
270                c.close();
271            }
272        }
273        
274        public long getLastProducerSequenceId(ProducerId id) throws IOException {
275            TransactionContext c = getTransactionContext();
276            try {
277                return getAdapter().doGetLastProducerSequenceId(c, id);
278            } catch (SQLException e) {
279                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
280                throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
281            } finally {
282                c.close();
283            }
284        }
285    
286        @Override
287        public void init() throws Exception {
288            getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
289    
290            if (isCreateTablesOnStartup()) {
291                TransactionContext transactionContext = getTransactionContext();
292                transactionContext.begin();
293                try {
294                    try {
295                        getAdapter().doCreateTables(transactionContext);
296                    } catch (SQLException e) {
297                        LOG.warn("Cannot create tables due to: " + e);
298                        JDBCPersistenceAdapter.log("Failure Details: ", e);
299                    }
300                } finally {
301                    transactionContext.commit();
302                }
303            }
304        }
305    
306        public void doStart() throws Exception {
307            // Cleanup the db periodically.
308            if (cleanupPeriod > 0) {
309                cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
310                    public void run() {
311                        cleanup();
312                    }
313                }, 0, cleanupPeriod, TimeUnit.MILLISECONDS);
314            }
315            createMessageAudit();
316        }
317    
318        public synchronized void doStop(ServiceStopper stopper) throws Exception {
319            if (cleanupTicket != null) {
320                cleanupTicket.cancel(true);
321                cleanupTicket = null;
322            }
323        }
324    
325        public void cleanup() {
326            TransactionContext c = null;
327            try {
328                LOG.debug("Cleaning up old messages.");
329                c = getTransactionContext();
330                getAdapter().doDeleteOldMessages(c);
331            } catch (IOException e) {
332                LOG.warn("Old message cleanup failed due to: " + e, e);
333            } catch (SQLException e) {
334                LOG.warn("Old message cleanup failed due to: " + e);
335                JDBCPersistenceAdapter.log("Failure Details: ", e);
336            } finally {
337                if (c != null) {
338                    try {
339                        c.close();
340                    } catch (Throwable e) {
341                    }
342                }
343                LOG.debug("Cleanup done.");
344            }
345        }
346    
347        public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) {
348            this.clockDaemon = clockDaemon;
349        }
350    
351        public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
352            if (clockDaemon == null) {
353                clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
354                    public Thread newThread(Runnable runnable) {
355                        Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
356                        thread.setDaemon(true);
357                        return thread;
358                    }
359                });
360            }
361            return clockDaemon;
362        }
363    
364        public JDBCAdapter getAdapter() throws IOException {
365            if (adapter == null) {
366                setAdapter(createAdapter());
367            }
368            return adapter;
369        }
370    
371        /**
372         *
373         * @deprecated as of 5.7.0, replaced by {@link #getLocker()}
374         */
375        @Deprecated
376        public Locker getDatabaseLocker() throws IOException {
377            return getLocker();
378        }
379    
380        /**
381         * Sets the database locker strategy to use to lock the database on startup
382         * @throws IOException
383         *
384         * @deprecated as of 5.7.0, replaced by {@link #setLocker(org.apache.activemq.broker.Locker)}
385         */
386        public void setDatabaseLocker(Locker locker) throws IOException {
387            setLocker(locker);
388        }
389    
390        public DataSource getLockDataSource() throws IOException {
391            if (lockDataSource == null) {
392                lockDataSource = getDataSource();
393                if (lockDataSource == null) {
394                    throw new IllegalArgumentException(
395                            "No dataSource property has been configured");
396                }
397            } else {
398                LOG.info("Using a separate dataSource for locking: "
399                        + lockDataSource);
400            }
401            return lockDataSource;
402        }
403        
404        public void setLockDataSource(DataSource dataSource) {
405            this.lockDataSource = dataSource;
406        }
407    
408        public BrokerService getBrokerService() {
409            return brokerService;
410        }
411    
412        /**
413         * @throws IOException
414         */
415        protected JDBCAdapter createAdapter() throws IOException {
416           
417            adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
418           
419            // Use the default JDBC adapter if the
420            // Database type is not recognized.
421            if (adapter == null) {
422                adapter = new DefaultJDBCAdapter();
423                LOG.debug("Using default JDBC Adapter: " + adapter);
424            }
425            return adapter;
426        }
427    
428        private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
429            Object adapter = null;
430            TransactionContext c = getTransactionContext();
431            try {
432                try {
433                    // Make the filename file system safe.
434                    String dirverName = c.getConnection().getMetaData().getDriverName();
435                    dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(Locale.ENGLISH);
436    
437                    try {
438                        adapter = finder.newInstance(dirverName);
439                        LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
440                    } catch (Throwable e) {
441                        LOG.info("Database " + kind + " driver override not found for : [" + dirverName
442                                 + "].  Will use default implementation.");
443                    }
444                } catch (SQLException e) {
445                    LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
446                              + e.getMessage());
447                    JDBCPersistenceAdapter.log("Failure Details: ", e);
448                }
449            } finally {
450                c.close();
451            }
452            return adapter;
453        }
454    
455        public void setAdapter(JDBCAdapter adapter) {
456            this.adapter = adapter;
457            this.adapter.setStatements(getStatements());
458            this.adapter.setMaxRows(getMaxRows());
459        }
460    
461        public WireFormat getWireFormat() {
462            return wireFormat;
463        }
464    
465        public void setWireFormat(WireFormat wireFormat) {
466            this.wireFormat = wireFormat;
467        }
468    
469        public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
470            if (context == null) {
471                return getTransactionContext();
472            } else {
473                TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
474                if (answer == null) {
475                    answer = getTransactionContext();
476                    context.setLongTermStoreContext(answer);
477                }
478                return answer;
479            }
480        }
481    
482        public TransactionContext getTransactionContext() throws IOException {
483            TransactionContext answer = new TransactionContext(this);
484            if (transactionIsolation > 0) {
485                answer.setTransactionIsolation(transactionIsolation);
486            }
487            return answer;
488        }
489    
490        public void beginTransaction(ConnectionContext context) throws IOException {
491            TransactionContext transactionContext = getTransactionContext(context);
492            transactionContext.begin();
493        }
494    
495        public void commitTransaction(ConnectionContext context) throws IOException {
496            TransactionContext transactionContext = getTransactionContext(context);
497            transactionContext.commit();
498        }
499    
500        public void rollbackTransaction(ConnectionContext context) throws IOException {
501            TransactionContext transactionContext = getTransactionContext(context);
502            transactionContext.rollback();
503        }
504    
505        public int getCleanupPeriod() {
506            return cleanupPeriod;
507        }
508    
509        /**
510         * Sets the number of milliseconds until the database is attempted to be
511         * cleaned up for durable topics
512         */
513        public void setCleanupPeriod(int cleanupPeriod) {
514            this.cleanupPeriod = cleanupPeriod;
515        }
516    
517        public boolean isChangeAutoCommitAllowed() {
518            return changeAutoCommitAllowed;
519        }
520    
521        /**
522         * Whether the JDBC driver allows to set the auto commit.
523         * Some drivers does not allow changing the auto commit. The default value is true.
524         *
525         * @param changeAutoCommitAllowed true to change, false to not change.
526         */
527        public void setChangeAutoCommitAllowed(boolean changeAutoCommitAllowed) {
528            this.changeAutoCommitAllowed = changeAutoCommitAllowed;
529        }
530    
531        public void deleteAllMessages() throws IOException {
532            TransactionContext c = getTransactionContext();
533            try {
534                getAdapter().doDropTables(c);
535                getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
536                getAdapter().doCreateTables(c);
537                LOG.info("Persistence store purged.");
538            } catch (SQLException e) {
539                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
540                throw IOExceptionSupport.create(e);
541            } finally {
542                c.close();
543            }
544        }
545    
546        public boolean isUseExternalMessageReferences() {
547            return useExternalMessageReferences;
548        }
549    
550        public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
551            this.useExternalMessageReferences = useExternalMessageReferences;
552        }
553    
554        public boolean isCreateTablesOnStartup() {
555            return createTablesOnStartup;
556        }
557    
558        /**
559         * Sets whether or not tables are created on startup
560         */
561        public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
562            this.createTablesOnStartup = createTablesOnStartup;
563        }
564    
565        /**
566         * @deprecated use {@link #setUseLock(boolean)} instead
567         *
568         * Sets whether or not an exclusive database lock should be used to enable
569         * JDBC Master/Slave. Enabled by default.
570         */
571        @Deprecated
572        public void setUseDatabaseLock(boolean useDatabaseLock) {
573            setUseLock(useDatabaseLock);
574        }
575    
576        public static void log(String msg, SQLException e) {
577            String s = msg + e.getMessage();
578            while (e.getNextException() != null) {
579                e = e.getNextException();
580                s += ", due to: " + e.getMessage();
581            }
582            LOG.warn(s, e);
583        }
584    
585        public Statements getStatements() {
586            if (statements == null) {
587                statements = new Statements();
588            }
589            return statements;
590        }
591    
592        public void setStatements(Statements statements) {
593            this.statements = statements;
594        }
595    
596        /**
597         * @param usageManager The UsageManager that is controlling the
598         *                destination's memory usage.
599         */
600        public void setUsageManager(SystemUsage usageManager) {
601        }
602    
603        public Locker createDefaultLocker() throws IOException {
604            Locker locker = (Locker) loadAdapter(lockFactoryFinder, "lock");
605            if (locker == null) {
606                locker = new DefaultDatabaseLocker();
607                LOG.debug("Using default JDBC Locker: " + locker);
608            }
609            locker.configure(this);
610            return locker;
611        }
612    
613        public void setBrokerName(String brokerName) {
614        }
615    
616        public String toString() {
617            return "JDBCPersistenceAdapter(" + super.toString() + ")";
618        }
619    
620        public void setDirectory(File dir) {
621            this.directory=dir;
622        }
623        
624        public File getDirectory(){
625            if (this.directory==null && brokerService != null){
626                this.directory=brokerService.getBrokerDataDirectory();
627            }
628            return this.directory;
629        }
630    
631        // interesting bit here is proof that DB is ok
632        public void checkpoint(boolean sync) throws IOException {
633            // by pass TransactionContext to avoid IO Exception handler
634            Connection connection = null;
635            try {
636                connection = getDataSource().getConnection();
637            } catch (SQLException e) {
638                LOG.debug("Could not get JDBC connection for checkpoint: " + e);
639                throw IOExceptionSupport.create(e);
640            } finally {
641                if (connection != null) {
642                    try {
643                        connection.close();
644                    } catch (Throwable ignored) {
645                    }
646                }
647            }
648        }
649    
650        public long size(){
651            return 0;
652        }
653    
654        /**
655         * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
656         *
657         * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
658         * not applied if DataBaseLocker is injected.
659         *
660         */
661        public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException {
662            getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval);
663        }
664        
665        /**
666         * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
667         * This allowable dirty isolation level may not be achievable in clustered DB environments
668         * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
669         * see isolation level constants in {@link java.sql.Connection}
670         * @param transactionIsolation the isolation level to use
671         */
672        public void setTransactionIsolation(int transactionIsolation) {
673            this.transactionIsolation = transactionIsolation;
674        }
675    
676            public int getMaxProducersToAudit() {
677                    return maxProducersToAudit;
678            }
679    
680            public void setMaxProducersToAudit(int maxProducersToAudit) {
681                    this.maxProducersToAudit = maxProducersToAudit;
682            }
683    
684            public int getMaxAuditDepth() {
685                    return maxAuditDepth;
686            }
687    
688            public void setMaxAuditDepth(int maxAuditDepth) {
689                    this.maxAuditDepth = maxAuditDepth;
690            }
691    
692            public boolean isEnableAudit() {
693                    return enableAudit;
694            }
695    
696            public void setEnableAudit(boolean enableAudit) {
697                    this.enableAudit = enableAudit;
698            }
699    
700        public int getAuditRecoveryDepth() {
701            return auditRecoveryDepth;
702        }
703    
704        public void setAuditRecoveryDepth(int auditRecoveryDepth) {
705            this.auditRecoveryDepth = auditRecoveryDepth;
706        }
707    
708        public long getNextSequenceId() {
709            synchronized(sequenceGenerator) {
710                return sequenceGenerator.getNextSequenceId();
711            }
712        }
713    
714        public int getMaxRows() {
715            return maxRows;
716        }
717    
718        /*
719         * the max rows return from queries, with sparse selectors this may need to be increased
720         */
721        public void setMaxRows(int maxRows) {
722            this.maxRows = maxRows;
723        }
724    
725        public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws IOException {
726            TransactionContext c = getTransactionContext();
727            try {
728                getAdapter().doRecoverPreparedOps(c, jdbcMemoryTransactionStore);
729            } catch (SQLException e) {
730                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
731                throw IOExceptionSupport.create("Failed to recover from: " + jdbcMemoryTransactionStore + ". Reason: " + e,e);
732            } finally {
733                c.close();
734            }
735        }
736    
737        public void commitAdd(ConnectionContext context, MessageId messageId) throws IOException {
738            TransactionContext c = getTransactionContext(context);
739            try {
740                long sequence = (Long)messageId.getDataLocator();
741                getAdapter().doCommitAddOp(c, sequence);
742            } catch (SQLException e) {
743                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
744                throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e);
745            } finally {
746                c.close();
747            }
748        }
749    
750        public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException {
751            TransactionContext c = getTransactionContext(context);
752            try {
753                getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getDataLocator(), null);
754            } catch (SQLException e) {
755                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
756                throw IOExceptionSupport.create("Failed to commit last ack: " + ack + ". Reason: " + e,e);
757            } finally {
758                c.close();
759            }
760        }
761    
762    
763        public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
764            TransactionContext c = getTransactionContext(context);
765            try {
766                getAdapter().doSetLastAck(c, destination, null, clientId, subName, xidLastAck, priority);
767            } catch (SQLException e) {
768                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
769                throw IOExceptionSupport.create("Failed to commit last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e,e);
770            } finally {
771                c.close();
772            }
773        }
774    
775        public void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId) throws IOException {
776            TransactionContext c = getTransactionContext(context);
777            try {
778                byte priority = (byte) store.getCachedStoreSequenceId(c, store.getDestination(), ack.getLastMessageId())[1];
779                getAdapter().doClearLastAck(c, store.getDestination(), priority, clientId, subName);
780            } catch (SQLException e) {
781                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
782                throw IOExceptionSupport.create("Failed to rollback last ack: " + ack + " on " +  store.getDestination() + " for " + subName + ":" + clientId + ". Reason: " + e,e);
783            } finally {
784                c.close();
785            }
786        }
787    
788        // after recovery there is no record of the original messageId for the ack
789        public void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
790            TransactionContext c = getTransactionContext(context);
791            try {
792                getAdapter().doClearLastAck(c, destination, priority, clientId, subName);
793            } catch (SQLException e) {
794                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
795                throw IOExceptionSupport.create("Failed to rollback last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e, e);
796            } finally {
797                c.close();
798            }
799        }
800    
801        long[] getStoreSequenceIdForMessageId(MessageId messageId, ActiveMQDestination destination) throws IOException {
802            long[] result = new long[]{-1, Byte.MAX_VALUE -1};
803            TransactionContext c = getTransactionContext();
804            try {
805                result = adapter.getStoreSequenceId(c, destination, messageId);
806            } catch (SQLException e) {
807                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
808                throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
809            } finally {
810                c.close();
811            }
812            return result;
813        }
814    
815    }