001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.activemq.store.jdbc;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.sql.Connection;
022    import java.sql.SQLException;
023    import java.util.Collections;
024    import java.util.Locale;
025    import java.util.Set;
026    import java.util.concurrent.ScheduledFuture;
027    import java.util.concurrent.ScheduledThreadPoolExecutor;
028    import java.util.concurrent.ThreadFactory;
029    import java.util.concurrent.TimeUnit;
030    
031    import javax.sql.DataSource;
032    
033    import org.apache.activemq.ActiveMQMessageAudit;
034    import org.apache.activemq.broker.BrokerService;
035    import org.apache.activemq.broker.BrokerServiceAware;
036    import org.apache.activemq.broker.ConnectionContext;
037    import org.apache.activemq.command.ActiveMQDestination;
038    import org.apache.activemq.command.ActiveMQQueue;
039    import org.apache.activemq.command.ActiveMQTopic;
040    import org.apache.activemq.command.Message;
041    import org.apache.activemq.command.MessageAck;
042    import org.apache.activemq.command.MessageId;
043    import org.apache.activemq.command.ProducerId;
044    import org.apache.activemq.openwire.OpenWireFormat;
045    import org.apache.activemq.broker.Locker;
046    import org.apache.activemq.store.MessageStore;
047    import org.apache.activemq.store.PersistenceAdapter;
048    import org.apache.activemq.store.TopicMessageStore;
049    import org.apache.activemq.store.TransactionStore;
050    import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
051    import org.apache.activemq.store.memory.MemoryTransactionStore;
052    import org.apache.activemq.usage.SystemUsage;
053    import org.apache.activemq.util.ByteSequence;
054    import org.apache.activemq.util.FactoryFinder;
055    import org.apache.activemq.util.IOExceptionSupport;
056    import org.apache.activemq.util.LongSequenceGenerator;
057    import org.apache.activemq.util.ServiceStopper;
058    import org.apache.activemq.wireformat.WireFormat;
059    import org.slf4j.Logger;
060    import org.slf4j.LoggerFactory;
061    
062    /**
063     * A {@link PersistenceAdapter} implementation using JDBC for persistence
064     * storage.
065     * 
066     * This persistence adapter will correctly remember prepared XA transactions,
067     * but it will not keep track of local transaction commits so that operations
068     * performed against the Message store are done as a single uow.
069     * 
070     * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
071     * 
072     * 
073     */
074    public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter,
075        BrokerServiceAware {
076    
077        private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
078        private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
079                                                                       "META-INF/services/org/apache/activemq/store/jdbc/");
080        private static FactoryFinder lockFactoryFinder = new FactoryFinder(
081                                                                        "META-INF/services/org/apache/activemq/store/jdbc/lock/");
082    
083        public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000;
084    
085        private WireFormat wireFormat = new OpenWireFormat();
086        private BrokerService brokerService;
087        private Statements statements;
088        private JDBCAdapter adapter;
089        private MemoryTransactionStore transactionStore;
090        private ScheduledThreadPoolExecutor clockDaemon;
091        private ScheduledFuture<?> cleanupTicket;
092        private int cleanupPeriod = 1000 * 60 * 5;
093        private boolean useExternalMessageReferences;
094        private boolean createTablesOnStartup = true;
095        private DataSource lockDataSource;
096        private int transactionIsolation;
097        private File directory;
098        
099        protected int maxProducersToAudit=1024;
100        protected int maxAuditDepth=1000;
101        protected boolean enableAudit=false;
102        protected int auditRecoveryDepth = 1024;
103        protected ActiveMQMessageAudit audit;
104        
105        protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
106        protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
107    
108        {
109            setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD);
110        }
111    
112        public JDBCPersistenceAdapter() {
113        }
114    
115        public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
116            super(ds);
117            this.wireFormat = wireFormat;
118        }
119    
120        public Set<ActiveMQDestination> getDestinations() {
121            TransactionContext c = null;
122            try {
123                c = getTransactionContext();
124                return getAdapter().doGetDestinations(c);
125            } catch (IOException e) {
126                return emptyDestinationSet();
127            } catch (SQLException e) {
128                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
129                return emptyDestinationSet();
130            } finally {
131                if (c != null) {
132                    try {
133                        c.close();
134                    } catch (Throwable e) {
135                    }
136                }
137            }
138        }
139    
140        @SuppressWarnings("unchecked")
141        private Set<ActiveMQDestination> emptyDestinationSet() {
142            return Collections.EMPTY_SET;
143        }
144        
145        protected void createMessageAudit() {
146            if (enableAudit && audit == null) {
147                audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
148                TransactionContext c = null;
149                
150                try {
151                    c = getTransactionContext();
152                    getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
153                        public void messageId(MessageId id) {
154                            audit.isDuplicate(id);
155                        }
156                    });
157                } catch (Exception e) {
158                    LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
159                } finally {
160                    if (c != null) {
161                        try {
162                            c.close();
163                        } catch (Throwable e) {
164                        }
165                    }
166                }
167            }
168        }
169        
170        public void initSequenceIdGenerator() {
171            TransactionContext c = null;
172            try {
173                c = getTransactionContext();
174                getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
175                    public void messageId(MessageId id) {
176                        audit.isDuplicate(id);
177                    }
178                });
179            } catch (Exception e) {
180                LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
181            } finally {
182                if (c != null) {
183                    try {
184                        c.close();
185                    } catch (Throwable e) {
186                    }
187                }
188            }
189            
190        }
191    
192        public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
193            MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
194            if (transactionStore != null) {
195                rc = transactionStore.proxy(rc);
196            }
197            return rc;
198        }
199    
200        public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
201            TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
202            if (transactionStore != null) {
203                rc = transactionStore.proxy(rc);
204            }
205            return rc;
206        }
207    
208        /**
209         * Cleanup method to remove any state associated with the given destination
210         * @param destination Destination to forget
211         */
212        public void removeQueueMessageStore(ActiveMQQueue destination) {
213            if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) {
214                try {
215                    removeConsumerDestination(destination);
216                } catch (IOException ioe) {
217                    LOG.error("Failed to remove consumer destination: " + destination, ioe);
218                }
219            }
220        }
221    
222        private void removeConsumerDestination(ActiveMQQueue destination) throws IOException {
223            TransactionContext c = getTransactionContext();
224            try {
225                String id = destination.getQualifiedName();
226                getAdapter().doDeleteSubscription(c, destination, id, id);
227            } catch (SQLException e) {
228                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
229                throw IOExceptionSupport.create("Failed to remove consumer destination: " + destination, e);
230            } finally {
231                c.close();
232            }
233        }
234    
235        /**
236         * Cleanup method to remove any state associated with the given destination
237         * No state retained.... nothing to do
238         *
239         * @param destination Destination to forget
240         */
241        public void removeTopicMessageStore(ActiveMQTopic destination) {
242        }
243    
244        public TransactionStore createTransactionStore() throws IOException {
245            if (transactionStore == null) {
246                transactionStore = new JdbcMemoryTransactionStore(this);
247            }
248            return this.transactionStore;
249        }
250    
251        public long getLastMessageBrokerSequenceId() throws IOException {
252            TransactionContext c = getTransactionContext();
253            try {
254                long seq =  getAdapter().doGetLastMessageStoreSequenceId(c);
255                sequenceGenerator.setLastSequenceId(seq);
256                long brokerSeq = 0;
257                if (seq != 0) {
258                    byte[] msg = getAdapter().doGetMessageById(c, seq);
259                    if (msg != null) {
260                        Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
261                        brokerSeq = last.getMessageId().getBrokerSequenceId();
262                    } else {
263                       LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
264                    }
265                }
266                return brokerSeq;
267            } catch (SQLException e) {
268                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
269                throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
270            } finally {
271                c.close();
272            }
273        }
274        
275        public long getLastProducerSequenceId(ProducerId id) throws IOException {
276            TransactionContext c = getTransactionContext();
277            try {
278                return getAdapter().doGetLastProducerSequenceId(c, id);
279            } catch (SQLException e) {
280                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
281                throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
282            } finally {
283                c.close();
284            }
285        }
286    
287        @Override
288        public void init() throws Exception {
289            getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
290    
291            if (isCreateTablesOnStartup()) {
292                TransactionContext transactionContext = getTransactionContext();
293                transactionContext.begin();
294                try {
295                    try {
296                        getAdapter().doCreateTables(transactionContext);
297                    } catch (SQLException e) {
298                        LOG.warn("Cannot create tables due to: " + e);
299                        JDBCPersistenceAdapter.log("Failure Details: ", e);
300                    }
301                } finally {
302                    transactionContext.commit();
303                }
304            }
305        }
306    
307        public void doStart() throws Exception {
308            // Cleanup the db periodically.
309            if (cleanupPeriod > 0) {
310                cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
311                    public void run() {
312                        cleanup();
313                    }
314                }, 0, cleanupPeriod, TimeUnit.MILLISECONDS);
315            }
316            
317            createMessageAudit();
318        }
319    
320        public synchronized void doStop(ServiceStopper stopper) throws Exception {
321            if (cleanupTicket != null) {
322                cleanupTicket.cancel(true);
323                cleanupTicket = null;
324            }
325        }
326    
327        public void cleanup() {
328            TransactionContext c = null;
329            try {
330                LOG.debug("Cleaning up old messages.");
331                c = getTransactionContext();
332                getAdapter().doDeleteOldMessages(c);
333            } catch (IOException e) {
334                LOG.warn("Old message cleanup failed due to: " + e, e);
335            } catch (SQLException e) {
336                LOG.warn("Old message cleanup failed due to: " + e);
337                JDBCPersistenceAdapter.log("Failure Details: ", e);
338            } finally {
339                if (c != null) {
340                    try {
341                        c.close();
342                    } catch (Throwable e) {
343                    }
344                }
345                LOG.debug("Cleanup done.");
346            }
347        }
348    
349        public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) {
350            this.clockDaemon = clockDaemon;
351        }
352    
353        public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
354            if (clockDaemon == null) {
355                clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
356                    public Thread newThread(Runnable runnable) {
357                        Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
358                        thread.setDaemon(true);
359                        return thread;
360                    }
361                });
362            }
363            return clockDaemon;
364        }
365    
366        public JDBCAdapter getAdapter() throws IOException {
367            if (adapter == null) {
368                setAdapter(createAdapter());
369            }
370            return adapter;
371        }
372    
373        /**
374         *
375         * @deprecated as of 5.7.0, replaced by {@link #getLocker()}
376         */
377        @Deprecated
378        public Locker getDatabaseLocker() throws IOException {
379            return getLocker();
380        }
381    
382        /**
383         * Sets the database locker strategy to use to lock the database on startup
384         * @throws IOException
385         *
386         * @deprecated as of 5.7.0, replaced by {@link #setLocker(org.apache.activemq.broker.Locker)}
387         */
388        public void setDatabaseLocker(Locker locker) throws IOException {
389            setLocker(locker);
390        }
391    
392        public DataSource getLockDataSource() throws IOException {
393            if (lockDataSource == null) {
394                lockDataSource = getDataSource();
395                if (lockDataSource == null) {
396                    throw new IllegalArgumentException(
397                            "No dataSource property has been configured");
398                }
399            } else {
400                LOG.info("Using a separate dataSource for locking: "
401                        + lockDataSource);
402            }
403            return lockDataSource;
404        }
405        
406        public void setLockDataSource(DataSource dataSource) {
407            this.lockDataSource = dataSource;
408        }
409    
410        public BrokerService getBrokerService() {
411            return brokerService;
412        }
413    
414        public void setBrokerService(BrokerService brokerService) {
415            this.brokerService = brokerService;
416        }
417    
418        /**
419         * @throws IOException
420         */
421        protected JDBCAdapter createAdapter() throws IOException {
422           
423            adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
424           
425            // Use the default JDBC adapter if the
426            // Database type is not recognized.
427            if (adapter == null) {
428                adapter = new DefaultJDBCAdapter();
429                LOG.debug("Using default JDBC Adapter: " + adapter);
430            }
431            return adapter;
432        }
433    
434        private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
435            Object adapter = null;
436            TransactionContext c = getTransactionContext();
437            try {
438                try {
439                    // Make the filename file system safe.
440                    String dirverName = c.getConnection().getMetaData().getDriverName();
441                    dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(Locale.ENGLISH);
442    
443                    try {
444                        adapter = finder.newInstance(dirverName);
445                        LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
446                    } catch (Throwable e) {
447                        LOG.info("Database " + kind + " driver override not found for : [" + dirverName
448                                 + "].  Will use default implementation.");
449                    }
450                } catch (SQLException e) {
451                    LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
452                              + e.getMessage());
453                    JDBCPersistenceAdapter.log("Failure Details: ", e);
454                }
455            } finally {
456                c.close();
457            }
458            return adapter;
459        }
460    
461        public void setAdapter(JDBCAdapter adapter) {
462            this.adapter = adapter;
463            this.adapter.setStatements(getStatements());
464            this.adapter.setMaxRows(getMaxRows());
465        }
466    
467        public WireFormat getWireFormat() {
468            return wireFormat;
469        }
470    
471        public void setWireFormat(WireFormat wireFormat) {
472            this.wireFormat = wireFormat;
473        }
474    
475        public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
476            if (context == null) {
477                return getTransactionContext();
478            } else {
479                TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
480                if (answer == null) {
481                    answer = getTransactionContext();
482                    context.setLongTermStoreContext(answer);
483                }
484                return answer;
485            }
486        }
487    
488        public TransactionContext getTransactionContext() throws IOException {
489            TransactionContext answer = new TransactionContext(this);
490            if (transactionIsolation > 0) {
491                answer.setTransactionIsolation(transactionIsolation);
492            }
493            return answer;
494        }
495    
496        public void beginTransaction(ConnectionContext context) throws IOException {
497            TransactionContext transactionContext = getTransactionContext(context);
498            transactionContext.begin();
499        }
500    
501        public void commitTransaction(ConnectionContext context) throws IOException {
502            TransactionContext transactionContext = getTransactionContext(context);
503            transactionContext.commit();
504        }
505    
506        public void rollbackTransaction(ConnectionContext context) throws IOException {
507            TransactionContext transactionContext = getTransactionContext(context);
508            transactionContext.rollback();
509        }
510    
511        public int getCleanupPeriod() {
512            return cleanupPeriod;
513        }
514    
515        /**
516         * Sets the number of milliseconds until the database is attempted to be
517         * cleaned up for durable topics
518         */
519        public void setCleanupPeriod(int cleanupPeriod) {
520            this.cleanupPeriod = cleanupPeriod;
521        }
522    
523        public void deleteAllMessages() throws IOException {
524            TransactionContext c = getTransactionContext();
525            try {
526                getAdapter().doDropTables(c);
527                getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
528                getAdapter().doCreateTables(c);
529                LOG.info("Persistence store purged.");
530            } catch (SQLException e) {
531                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
532                throw IOExceptionSupport.create(e);
533            } finally {
534                c.close();
535            }
536        }
537    
538        public boolean isUseExternalMessageReferences() {
539            return useExternalMessageReferences;
540        }
541    
542        public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
543            this.useExternalMessageReferences = useExternalMessageReferences;
544        }
545    
546        public boolean isCreateTablesOnStartup() {
547            return createTablesOnStartup;
548        }
549    
550        /**
551         * Sets whether or not tables are created on startup
552         */
553        public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
554            this.createTablesOnStartup = createTablesOnStartup;
555        }
556    
557        /**
558         * @deprecated use {@link #setUseLock(boolean)} instead
559         *
560         * Sets whether or not an exclusive database lock should be used to enable
561         * JDBC Master/Slave. Enabled by default.
562         */
563        @Deprecated
564        public void setUseDatabaseLock(boolean useDatabaseLock) {
565            setUseLock(useDatabaseLock);
566        }
567    
568        public static void log(String msg, SQLException e) {
569            String s = msg + e.getMessage();
570            while (e.getNextException() != null) {
571                e = e.getNextException();
572                s += ", due to: " + e.getMessage();
573            }
574            LOG.warn(s, e);
575        }
576    
577        public Statements getStatements() {
578            if (statements == null) {
579                statements = new Statements();
580            }
581            return statements;
582        }
583    
584        public void setStatements(Statements statements) {
585            this.statements = statements;
586        }
587    
588        /**
589         * @param usageManager The UsageManager that is controlling the
590         *                destination's memory usage.
591         */
592        public void setUsageManager(SystemUsage usageManager) {
593        }
594    
595        public Locker createDefaultLocker() throws IOException {
596            DefaultDatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");
597            if (locker == null) {
598                locker = new DefaultDatabaseLocker();
599                LOG.debug("Using default JDBC Locker: " + locker);
600            }
601            locker.configure(this);
602            return locker;
603        }
604    
605        public void setBrokerName(String brokerName) {
606        }
607    
608        public String toString() {
609            return "JDBCPersistenceAdapter(" + super.toString() + ")";
610        }
611    
612        public void setDirectory(File dir) {
613            this.directory=dir;
614        }
615        
616        public File getDirectory(){
617            if (this.directory==null && brokerService != null){
618                this.directory=brokerService.getBrokerDataDirectory();
619            }
620            return this.directory;
621        }
622    
623        // interesting bit here is proof that DB is ok
624        public void checkpoint(boolean sync) throws IOException {
625            // by pass TransactionContext to avoid IO Exception handler
626            Connection connection = null;
627            try {
628                connection = getDataSource().getConnection();
629            } catch (SQLException e) {
630                LOG.debug("Could not get JDBC connection for checkpoint: " + e);
631                throw IOExceptionSupport.create(e);
632            } finally {
633                if (connection != null) {
634                    try {
635                        connection.close();
636                    } catch (Throwable ignored) {
637                    }
638                }
639            }
640        }
641    
642        public long size(){
643            return 0;
644        }
645    
646        /**
647         * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
648         *
649         * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
650         * not applied if DataBaseLocker is injected.
651         *
652         */
653        public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException {
654            getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval);
655        }
656        
657        /**
658         * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
659         * This allowable dirty isolation level may not be achievable in clustered DB environments
660         * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
661         * see isolation level constants in {@link java.sql.Connection}
662         * @param transactionIsolation the isolation level to use
663         */
664        public void setTransactionIsolation(int transactionIsolation) {
665            this.transactionIsolation = transactionIsolation;
666        }
667    
668            public int getMaxProducersToAudit() {
669                    return maxProducersToAudit;
670            }
671    
672            public void setMaxProducersToAudit(int maxProducersToAudit) {
673                    this.maxProducersToAudit = maxProducersToAudit;
674            }
675    
676            public int getMaxAuditDepth() {
677                    return maxAuditDepth;
678            }
679    
680            public void setMaxAuditDepth(int maxAuditDepth) {
681                    this.maxAuditDepth = maxAuditDepth;
682            }
683    
684            public boolean isEnableAudit() {
685                    return enableAudit;
686            }
687    
688            public void setEnableAudit(boolean enableAudit) {
689                    this.enableAudit = enableAudit;
690            }
691    
692        public int getAuditRecoveryDepth() {
693            return auditRecoveryDepth;
694        }
695    
696        public void setAuditRecoveryDepth(int auditRecoveryDepth) {
697            this.auditRecoveryDepth = auditRecoveryDepth;
698        }
699    
700        public long getNextSequenceId() {
701            synchronized(sequenceGenerator) {
702                return sequenceGenerator.getNextSequenceId();
703            }
704        }
705    
706        public int getMaxRows() {
707            return maxRows;
708        }
709    
710        /*
711         * the max rows return from queries, with sparse selectors this may need to be increased
712         */
713        public void setMaxRows(int maxRows) {
714            this.maxRows = maxRows;
715        }
716    
717        public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws IOException {
718            TransactionContext c = getTransactionContext();
719            try {
720                getAdapter().doRecoverPreparedOps(c, jdbcMemoryTransactionStore);
721            } catch (SQLException e) {
722                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
723                throw IOExceptionSupport.create("Failed to recover from: " + jdbcMemoryTransactionStore + ". Reason: " + e,e);
724            } finally {
725                c.close();
726            }
727        }
728    
729        public void commitAdd(ConnectionContext context, MessageId messageId) throws IOException {
730            TransactionContext c = getTransactionContext(context);
731            try {
732                long sequence = (Long)messageId.getDataLocator();
733                getAdapter().doCommitAddOp(c, sequence);
734            } catch (SQLException e) {
735                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
736                throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e);
737            } finally {
738                c.close();
739            }
740        }
741    
742        public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException {
743            TransactionContext c = getTransactionContext(context);
744            try {
745                getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getDataLocator(), null);
746            } catch (SQLException e) {
747                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
748                throw IOExceptionSupport.create("Failed to commit last ack: " + ack + ". Reason: " + e,e);
749            } finally {
750                c.close();
751            }
752        }
753    
754    
755        public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
756            TransactionContext c = getTransactionContext(context);
757            try {
758                getAdapter().doSetLastAck(c, destination, null, clientId, subName, xidLastAck, priority);
759            } catch (SQLException e) {
760                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
761                throw IOExceptionSupport.create("Failed to commit last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e,e);
762            } finally {
763                c.close();
764            }
765        }
766    
767        public void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId) throws IOException {
768            TransactionContext c = getTransactionContext(context);
769            try {
770                byte priority = (byte) store.getCachedStoreSequenceId(c, store.getDestination(), ack.getLastMessageId())[1];
771                getAdapter().doClearLastAck(c, store.getDestination(), priority, clientId, subName);
772            } catch (SQLException e) {
773                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
774                throw IOExceptionSupport.create("Failed to rollback last ack: " + ack + " on " +  store.getDestination() + " for " + subName + ":" + clientId + ". Reason: " + e,e);
775            } finally {
776                c.close();
777            }
778        }
779    
780        // after recovery there is no record of the original messageId for the ack
781        public void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
782            TransactionContext c = getTransactionContext(context);
783            try {
784                getAdapter().doClearLastAck(c, destination, priority, clientId, subName);
785            } catch (SQLException e) {
786                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
787                throw IOExceptionSupport.create("Failed to rollback last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e, e);
788            } finally {
789                c.close();
790            }
791        }
792    
793        long[] getStoreSequenceIdForMessageId(MessageId messageId, ActiveMQDestination destination) throws IOException {
794            long[] result = new long[]{-1, Byte.MAX_VALUE -1};
795            TransactionContext c = getTransactionContext();
796            try {
797                result = adapter.getStoreSequenceId(c, destination, messageId);
798            } catch (SQLException e) {
799                JDBCPersistenceAdapter.log("JDBC Failure: ", e);
800                throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
801            } finally {
802                c.close();
803            }
804            return result;
805        }
806    
807    }