001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.File;
020 import java.io.IOException;
021 import java.sql.Connection;
022 import java.sql.SQLException;
023 import java.util.Collections;
024 import java.util.Locale;
025 import java.util.Set;
026 import java.util.concurrent.ScheduledFuture;
027 import java.util.concurrent.ScheduledThreadPoolExecutor;
028 import java.util.concurrent.ThreadFactory;
029 import java.util.concurrent.TimeUnit;
030
031 import javax.sql.DataSource;
032
033 import org.apache.activemq.ActiveMQMessageAudit;
034 import org.apache.activemq.broker.BrokerService;
035 import org.apache.activemq.broker.BrokerServiceAware;
036 import org.apache.activemq.broker.ConnectionContext;
037 import org.apache.activemq.command.ActiveMQDestination;
038 import org.apache.activemq.command.ActiveMQQueue;
039 import org.apache.activemq.command.ActiveMQTopic;
040 import org.apache.activemq.command.Message;
041 import org.apache.activemq.command.MessageAck;
042 import org.apache.activemq.command.MessageId;
043 import org.apache.activemq.command.ProducerId;
044 import org.apache.activemq.openwire.OpenWireFormat;
045 import org.apache.activemq.broker.Locker;
046 import org.apache.activemq.store.MessageStore;
047 import org.apache.activemq.store.PersistenceAdapter;
048 import org.apache.activemq.store.TopicMessageStore;
049 import org.apache.activemq.store.TransactionStore;
050 import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
051 import org.apache.activemq.store.memory.MemoryTransactionStore;
052 import org.apache.activemq.usage.SystemUsage;
053 import org.apache.activemq.util.ByteSequence;
054 import org.apache.activemq.util.FactoryFinder;
055 import org.apache.activemq.util.IOExceptionSupport;
056 import org.apache.activemq.util.LongSequenceGenerator;
057 import org.apache.activemq.util.ServiceStopper;
058 import org.apache.activemq.wireformat.WireFormat;
059 import org.slf4j.Logger;
060 import org.slf4j.LoggerFactory;
061
062 /**
063 * A {@link PersistenceAdapter} implementation using JDBC for persistence
064 * storage.
065 *
066 * This persistence adapter will correctly remember prepared XA transactions,
067 * but it will not keep track of local transaction commits so that operations
068 * performed against the Message store are done as a single uow.
069 *
070 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter"
071 *
072 *
073 */
074 public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter,
075 BrokerServiceAware {
076
077 private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class);
078 private static FactoryFinder adapterFactoryFinder = new FactoryFinder(
079 "META-INF/services/org/apache/activemq/store/jdbc/");
080 private static FactoryFinder lockFactoryFinder = new FactoryFinder(
081 "META-INF/services/org/apache/activemq/store/jdbc/lock/");
082
083 public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000;
084
085 private WireFormat wireFormat = new OpenWireFormat();
086 private BrokerService brokerService;
087 private Statements statements;
088 private JDBCAdapter adapter;
089 private MemoryTransactionStore transactionStore;
090 private ScheduledThreadPoolExecutor clockDaemon;
091 private ScheduledFuture<?> cleanupTicket;
092 private int cleanupPeriod = 1000 * 60 * 5;
093 private boolean useExternalMessageReferences;
094 private boolean createTablesOnStartup = true;
095 private DataSource lockDataSource;
096 private int transactionIsolation;
097 private File directory;
098
099 protected int maxProducersToAudit=1024;
100 protected int maxAuditDepth=1000;
101 protected boolean enableAudit=false;
102 protected int auditRecoveryDepth = 1024;
103 protected ActiveMQMessageAudit audit;
104
105 protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
106 protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
107
108 {
109 setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD);
110 }
111
112 public JDBCPersistenceAdapter() {
113 }
114
115 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
116 super(ds);
117 this.wireFormat = wireFormat;
118 }
119
120 public Set<ActiveMQDestination> getDestinations() {
121 TransactionContext c = null;
122 try {
123 c = getTransactionContext();
124 return getAdapter().doGetDestinations(c);
125 } catch (IOException e) {
126 return emptyDestinationSet();
127 } catch (SQLException e) {
128 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
129 return emptyDestinationSet();
130 } finally {
131 if (c != null) {
132 try {
133 c.close();
134 } catch (Throwable e) {
135 }
136 }
137 }
138 }
139
140 @SuppressWarnings("unchecked")
141 private Set<ActiveMQDestination> emptyDestinationSet() {
142 return Collections.EMPTY_SET;
143 }
144
145 protected void createMessageAudit() {
146 if (enableAudit && audit == null) {
147 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
148 TransactionContext c = null;
149
150 try {
151 c = getTransactionContext();
152 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
153 public void messageId(MessageId id) {
154 audit.isDuplicate(id);
155 }
156 });
157 } catch (Exception e) {
158 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
159 } finally {
160 if (c != null) {
161 try {
162 c.close();
163 } catch (Throwable e) {
164 }
165 }
166 }
167 }
168 }
169
170 public void initSequenceIdGenerator() {
171 TransactionContext c = null;
172 try {
173 c = getTransactionContext();
174 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() {
175 public void messageId(MessageId id) {
176 audit.isDuplicate(id);
177 }
178 });
179 } catch (Exception e) {
180 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e);
181 } finally {
182 if (c != null) {
183 try {
184 c.close();
185 } catch (Throwable e) {
186 }
187 }
188 }
189
190 }
191
192 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
193 MessageStore rc = new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit);
194 if (transactionStore != null) {
195 rc = transactionStore.proxy(rc);
196 }
197 return rc;
198 }
199
200 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
201 TopicMessageStore rc = new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit);
202 if (transactionStore != null) {
203 rc = transactionStore.proxy(rc);
204 }
205 return rc;
206 }
207
208 /**
209 * Cleanup method to remove any state associated with the given destination
210 * @param destination Destination to forget
211 */
212 public void removeQueueMessageStore(ActiveMQQueue destination) {
213 if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) {
214 try {
215 removeConsumerDestination(destination);
216 } catch (IOException ioe) {
217 LOG.error("Failed to remove consumer destination: " + destination, ioe);
218 }
219 }
220 }
221
222 private void removeConsumerDestination(ActiveMQQueue destination) throws IOException {
223 TransactionContext c = getTransactionContext();
224 try {
225 String id = destination.getQualifiedName();
226 getAdapter().doDeleteSubscription(c, destination, id, id);
227 } catch (SQLException e) {
228 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
229 throw IOExceptionSupport.create("Failed to remove consumer destination: " + destination, e);
230 } finally {
231 c.close();
232 }
233 }
234
235 /**
236 * Cleanup method to remove any state associated with the given destination
237 * No state retained.... nothing to do
238 *
239 * @param destination Destination to forget
240 */
241 public void removeTopicMessageStore(ActiveMQTopic destination) {
242 }
243
244 public TransactionStore createTransactionStore() throws IOException {
245 if (transactionStore == null) {
246 transactionStore = new JdbcMemoryTransactionStore(this);
247 }
248 return this.transactionStore;
249 }
250
251 public long getLastMessageBrokerSequenceId() throws IOException {
252 TransactionContext c = getTransactionContext();
253 try {
254 long seq = getAdapter().doGetLastMessageStoreSequenceId(c);
255 sequenceGenerator.setLastSequenceId(seq);
256 long brokerSeq = 0;
257 if (seq != 0) {
258 byte[] msg = getAdapter().doGetMessageById(c, seq);
259 if (msg != null) {
260 Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg));
261 brokerSeq = last.getMessageId().getBrokerSequenceId();
262 } else {
263 LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!");
264 }
265 }
266 return brokerSeq;
267 } catch (SQLException e) {
268 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
269 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
270 } finally {
271 c.close();
272 }
273 }
274
275 public long getLastProducerSequenceId(ProducerId id) throws IOException {
276 TransactionContext c = getTransactionContext();
277 try {
278 return getAdapter().doGetLastProducerSequenceId(c, id);
279 } catch (SQLException e) {
280 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
281 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
282 } finally {
283 c.close();
284 }
285 }
286
287 @Override
288 public void init() throws Exception {
289 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
290
291 if (isCreateTablesOnStartup()) {
292 TransactionContext transactionContext = getTransactionContext();
293 transactionContext.begin();
294 try {
295 try {
296 getAdapter().doCreateTables(transactionContext);
297 } catch (SQLException e) {
298 LOG.warn("Cannot create tables due to: " + e);
299 JDBCPersistenceAdapter.log("Failure Details: ", e);
300 }
301 } finally {
302 transactionContext.commit();
303 }
304 }
305 }
306
307 public void doStart() throws Exception {
308 // Cleanup the db periodically.
309 if (cleanupPeriod > 0) {
310 cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() {
311 public void run() {
312 cleanup();
313 }
314 }, 0, cleanupPeriod, TimeUnit.MILLISECONDS);
315 }
316
317 createMessageAudit();
318 }
319
320 public synchronized void doStop(ServiceStopper stopper) throws Exception {
321 if (cleanupTicket != null) {
322 cleanupTicket.cancel(true);
323 cleanupTicket = null;
324 }
325 }
326
327 public void cleanup() {
328 TransactionContext c = null;
329 try {
330 LOG.debug("Cleaning up old messages.");
331 c = getTransactionContext();
332 getAdapter().doDeleteOldMessages(c);
333 } catch (IOException e) {
334 LOG.warn("Old message cleanup failed due to: " + e, e);
335 } catch (SQLException e) {
336 LOG.warn("Old message cleanup failed due to: " + e);
337 JDBCPersistenceAdapter.log("Failure Details: ", e);
338 } finally {
339 if (c != null) {
340 try {
341 c.close();
342 } catch (Throwable e) {
343 }
344 }
345 LOG.debug("Cleanup done.");
346 }
347 }
348
349 public void setScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor clockDaemon) {
350 this.clockDaemon = clockDaemon;
351 }
352
353 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() {
354 if (clockDaemon == null) {
355 clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
356 public Thread newThread(Runnable runnable) {
357 Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
358 thread.setDaemon(true);
359 return thread;
360 }
361 });
362 }
363 return clockDaemon;
364 }
365
366 public JDBCAdapter getAdapter() throws IOException {
367 if (adapter == null) {
368 setAdapter(createAdapter());
369 }
370 return adapter;
371 }
372
373 /**
374 *
375 * @deprecated as of 5.7.0, replaced by {@link #getLocker()}
376 */
377 @Deprecated
378 public Locker getDatabaseLocker() throws IOException {
379 return getLocker();
380 }
381
382 /**
383 * Sets the database locker strategy to use to lock the database on startup
384 * @throws IOException
385 *
386 * @deprecated as of 5.7.0, replaced by {@link #setLocker(org.apache.activemq.broker.Locker)}
387 */
388 public void setDatabaseLocker(Locker locker) throws IOException {
389 setLocker(locker);
390 }
391
392 public DataSource getLockDataSource() throws IOException {
393 if (lockDataSource == null) {
394 lockDataSource = getDataSource();
395 if (lockDataSource == null) {
396 throw new IllegalArgumentException(
397 "No dataSource property has been configured");
398 }
399 } else {
400 LOG.info("Using a separate dataSource for locking: "
401 + lockDataSource);
402 }
403 return lockDataSource;
404 }
405
406 public void setLockDataSource(DataSource dataSource) {
407 this.lockDataSource = dataSource;
408 }
409
410 public BrokerService getBrokerService() {
411 return brokerService;
412 }
413
414 public void setBrokerService(BrokerService brokerService) {
415 this.brokerService = brokerService;
416 }
417
418 /**
419 * @throws IOException
420 */
421 protected JDBCAdapter createAdapter() throws IOException {
422
423 adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter");
424
425 // Use the default JDBC adapter if the
426 // Database type is not recognized.
427 if (adapter == null) {
428 adapter = new DefaultJDBCAdapter();
429 LOG.debug("Using default JDBC Adapter: " + adapter);
430 }
431 return adapter;
432 }
433
434 private Object loadAdapter(FactoryFinder finder, String kind) throws IOException {
435 Object adapter = null;
436 TransactionContext c = getTransactionContext();
437 try {
438 try {
439 // Make the filename file system safe.
440 String dirverName = c.getConnection().getMetaData().getDriverName();
441 dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(Locale.ENGLISH);
442
443 try {
444 adapter = finder.newInstance(dirverName);
445 LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass());
446 } catch (Throwable e) {
447 LOG.info("Database " + kind + " driver override not found for : [" + dirverName
448 + "]. Will use default implementation.");
449 }
450 } catch (SQLException e) {
451 LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: "
452 + e.getMessage());
453 JDBCPersistenceAdapter.log("Failure Details: ", e);
454 }
455 } finally {
456 c.close();
457 }
458 return adapter;
459 }
460
461 public void setAdapter(JDBCAdapter adapter) {
462 this.adapter = adapter;
463 this.adapter.setStatements(getStatements());
464 this.adapter.setMaxRows(getMaxRows());
465 }
466
467 public WireFormat getWireFormat() {
468 return wireFormat;
469 }
470
471 public void setWireFormat(WireFormat wireFormat) {
472 this.wireFormat = wireFormat;
473 }
474
475 public TransactionContext getTransactionContext(ConnectionContext context) throws IOException {
476 if (context == null) {
477 return getTransactionContext();
478 } else {
479 TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
480 if (answer == null) {
481 answer = getTransactionContext();
482 context.setLongTermStoreContext(answer);
483 }
484 return answer;
485 }
486 }
487
488 public TransactionContext getTransactionContext() throws IOException {
489 TransactionContext answer = new TransactionContext(this);
490 if (transactionIsolation > 0) {
491 answer.setTransactionIsolation(transactionIsolation);
492 }
493 return answer;
494 }
495
496 public void beginTransaction(ConnectionContext context) throws IOException {
497 TransactionContext transactionContext = getTransactionContext(context);
498 transactionContext.begin();
499 }
500
501 public void commitTransaction(ConnectionContext context) throws IOException {
502 TransactionContext transactionContext = getTransactionContext(context);
503 transactionContext.commit();
504 }
505
506 public void rollbackTransaction(ConnectionContext context) throws IOException {
507 TransactionContext transactionContext = getTransactionContext(context);
508 transactionContext.rollback();
509 }
510
511 public int getCleanupPeriod() {
512 return cleanupPeriod;
513 }
514
515 /**
516 * Sets the number of milliseconds until the database is attempted to be
517 * cleaned up for durable topics
518 */
519 public void setCleanupPeriod(int cleanupPeriod) {
520 this.cleanupPeriod = cleanupPeriod;
521 }
522
523 public void deleteAllMessages() throws IOException {
524 TransactionContext c = getTransactionContext();
525 try {
526 getAdapter().doDropTables(c);
527 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
528 getAdapter().doCreateTables(c);
529 LOG.info("Persistence store purged.");
530 } catch (SQLException e) {
531 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
532 throw IOExceptionSupport.create(e);
533 } finally {
534 c.close();
535 }
536 }
537
538 public boolean isUseExternalMessageReferences() {
539 return useExternalMessageReferences;
540 }
541
542 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
543 this.useExternalMessageReferences = useExternalMessageReferences;
544 }
545
546 public boolean isCreateTablesOnStartup() {
547 return createTablesOnStartup;
548 }
549
550 /**
551 * Sets whether or not tables are created on startup
552 */
553 public void setCreateTablesOnStartup(boolean createTablesOnStartup) {
554 this.createTablesOnStartup = createTablesOnStartup;
555 }
556
557 /**
558 * @deprecated use {@link #setUseLock(boolean)} instead
559 *
560 * Sets whether or not an exclusive database lock should be used to enable
561 * JDBC Master/Slave. Enabled by default.
562 */
563 @Deprecated
564 public void setUseDatabaseLock(boolean useDatabaseLock) {
565 setUseLock(useDatabaseLock);
566 }
567
568 public static void log(String msg, SQLException e) {
569 String s = msg + e.getMessage();
570 while (e.getNextException() != null) {
571 e = e.getNextException();
572 s += ", due to: " + e.getMessage();
573 }
574 LOG.warn(s, e);
575 }
576
577 public Statements getStatements() {
578 if (statements == null) {
579 statements = new Statements();
580 }
581 return statements;
582 }
583
584 public void setStatements(Statements statements) {
585 this.statements = statements;
586 }
587
588 /**
589 * @param usageManager The UsageManager that is controlling the
590 * destination's memory usage.
591 */
592 public void setUsageManager(SystemUsage usageManager) {
593 }
594
595 public Locker createDefaultLocker() throws IOException {
596 DefaultDatabaseLocker locker = (DefaultDatabaseLocker) loadAdapter(lockFactoryFinder, "lock");
597 if (locker == null) {
598 locker = new DefaultDatabaseLocker();
599 LOG.debug("Using default JDBC Locker: " + locker);
600 }
601 locker.configure(this);
602 return locker;
603 }
604
605 public void setBrokerName(String brokerName) {
606 }
607
608 public String toString() {
609 return "JDBCPersistenceAdapter(" + super.toString() + ")";
610 }
611
612 public void setDirectory(File dir) {
613 this.directory=dir;
614 }
615
616 public File getDirectory(){
617 if (this.directory==null && brokerService != null){
618 this.directory=brokerService.getBrokerDataDirectory();
619 }
620 return this.directory;
621 }
622
623 // interesting bit here is proof that DB is ok
624 public void checkpoint(boolean sync) throws IOException {
625 // by pass TransactionContext to avoid IO Exception handler
626 Connection connection = null;
627 try {
628 connection = getDataSource().getConnection();
629 } catch (SQLException e) {
630 LOG.debug("Could not get JDBC connection for checkpoint: " + e);
631 throw IOExceptionSupport.create(e);
632 } finally {
633 if (connection != null) {
634 try {
635 connection.close();
636 } catch (Throwable ignored) {
637 }
638 }
639 }
640 }
641
642 public long size(){
643 return 0;
644 }
645
646 /**
647 * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
648 *
649 * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker
650 * not applied if DataBaseLocker is injected.
651 *
652 */
653 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException {
654 getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval);
655 }
656
657 /**
658 * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED
659 * This allowable dirty isolation level may not be achievable in clustered DB environments
660 * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ
661 * see isolation level constants in {@link java.sql.Connection}
662 * @param transactionIsolation the isolation level to use
663 */
664 public void setTransactionIsolation(int transactionIsolation) {
665 this.transactionIsolation = transactionIsolation;
666 }
667
668 public int getMaxProducersToAudit() {
669 return maxProducersToAudit;
670 }
671
672 public void setMaxProducersToAudit(int maxProducersToAudit) {
673 this.maxProducersToAudit = maxProducersToAudit;
674 }
675
676 public int getMaxAuditDepth() {
677 return maxAuditDepth;
678 }
679
680 public void setMaxAuditDepth(int maxAuditDepth) {
681 this.maxAuditDepth = maxAuditDepth;
682 }
683
684 public boolean isEnableAudit() {
685 return enableAudit;
686 }
687
688 public void setEnableAudit(boolean enableAudit) {
689 this.enableAudit = enableAudit;
690 }
691
692 public int getAuditRecoveryDepth() {
693 return auditRecoveryDepth;
694 }
695
696 public void setAuditRecoveryDepth(int auditRecoveryDepth) {
697 this.auditRecoveryDepth = auditRecoveryDepth;
698 }
699
700 public long getNextSequenceId() {
701 synchronized(sequenceGenerator) {
702 return sequenceGenerator.getNextSequenceId();
703 }
704 }
705
706 public int getMaxRows() {
707 return maxRows;
708 }
709
710 /*
711 * the max rows return from queries, with sparse selectors this may need to be increased
712 */
713 public void setMaxRows(int maxRows) {
714 this.maxRows = maxRows;
715 }
716
717 public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws IOException {
718 TransactionContext c = getTransactionContext();
719 try {
720 getAdapter().doRecoverPreparedOps(c, jdbcMemoryTransactionStore);
721 } catch (SQLException e) {
722 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
723 throw IOExceptionSupport.create("Failed to recover from: " + jdbcMemoryTransactionStore + ". Reason: " + e,e);
724 } finally {
725 c.close();
726 }
727 }
728
729 public void commitAdd(ConnectionContext context, MessageId messageId) throws IOException {
730 TransactionContext c = getTransactionContext(context);
731 try {
732 long sequence = (Long)messageId.getDataLocator();
733 getAdapter().doCommitAddOp(c, sequence);
734 } catch (SQLException e) {
735 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
736 throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e);
737 } finally {
738 c.close();
739 }
740 }
741
742 public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException {
743 TransactionContext c = getTransactionContext(context);
744 try {
745 getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getDataLocator(), null);
746 } catch (SQLException e) {
747 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
748 throw IOExceptionSupport.create("Failed to commit last ack: " + ack + ". Reason: " + e,e);
749 } finally {
750 c.close();
751 }
752 }
753
754
755 public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
756 TransactionContext c = getTransactionContext(context);
757 try {
758 getAdapter().doSetLastAck(c, destination, null, clientId, subName, xidLastAck, priority);
759 } catch (SQLException e) {
760 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
761 throw IOExceptionSupport.create("Failed to commit last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e,e);
762 } finally {
763 c.close();
764 }
765 }
766
767 public void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId) throws IOException {
768 TransactionContext c = getTransactionContext(context);
769 try {
770 byte priority = (byte) store.getCachedStoreSequenceId(c, store.getDestination(), ack.getLastMessageId())[1];
771 getAdapter().doClearLastAck(c, store.getDestination(), priority, clientId, subName);
772 } catch (SQLException e) {
773 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
774 throw IOExceptionSupport.create("Failed to rollback last ack: " + ack + " on " + store.getDestination() + " for " + subName + ":" + clientId + ". Reason: " + e,e);
775 } finally {
776 c.close();
777 }
778 }
779
780 // after recovery there is no record of the original messageId for the ack
781 public void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId) throws IOException {
782 TransactionContext c = getTransactionContext(context);
783 try {
784 getAdapter().doClearLastAck(c, destination, priority, clientId, subName);
785 } catch (SQLException e) {
786 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
787 throw IOExceptionSupport.create("Failed to rollback last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e, e);
788 } finally {
789 c.close();
790 }
791 }
792
793 long[] getStoreSequenceIdForMessageId(MessageId messageId, ActiveMQDestination destination) throws IOException {
794 long[] result = new long[]{-1, Byte.MAX_VALUE -1};
795 TransactionContext c = getTransactionContext();
796 try {
797 result = adapter.getStoreSequenceId(c, destination, messageId);
798 } catch (SQLException e) {
799 JDBCPersistenceAdapter.log("JDBC Failure: ", e);
800 throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
801 } finally {
802 c.close();
803 }
804 return result;
805 }
806
807 }